1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.async.socket; 20 21 import core.stdc.errno: ECONNRESET; 22 import core.thread : Fiber; 23 import core.time : dur, Duration; 24 import std.array : empty; 25 import std.conv : to; 26 import std.exception : enforce; 27 import std.socket; 28 import thrift.base; 29 import thrift.async.base; 30 import thrift.transport.base; 31 import thrift.transport.socket : TSocketBase; 32 import thrift.internal.endian; 33 import thrift.internal.socket; 34 35 version (Windows) { 36 import std.c.windows.winsock : connect; 37 } else version (Posix) { 38 import core.sys.posix.sys.socket : connect; 39 } else static assert(0, "Don't know connect on this platform."); 40 41 /** 42 * Non-blocking socket implementation of the TTransport interface. 43 * 44 * Whenever a socket operation would block, TAsyncSocket registers a callback 45 * with the specified TAsyncSocketManager and yields. 46 * 47 * As for thrift.transport.socket, due to the limitations of std.socket, 48 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are 49 * not). 50 */ 51 class TAsyncSocket : TSocketBase, TAsyncTransport { 52 /** 53 * Constructor that takes an already created, connected (!) socket. 54 * 55 * Params: 56 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 57 * socket = Already created, connected socket object. Will be switched to 58 * non-blocking mode if it isn't already. 59 */ 60 this(TAsyncSocketManager asyncManager, Socket socket) { 61 asyncManager_ = asyncManager; 62 socket.blocking = false; 63 super(socket); 64 } 65 66 /** 67 * Creates a new unconnected socket that will connect to the given host 68 * on the given port. 69 * 70 * Params: 71 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 72 * host = Remote host. 73 * port = Remote port. 74 */ 75 this(TAsyncSocketManager asyncManager, string host, ushort port) { 76 asyncManager_ = asyncManager; 77 super(host, port); 78 } 79 80 override TAsyncManager asyncManager() @property { 81 return asyncManager_; 82 } 83 84 /** 85 * Asynchronously connects the socket. 86 * 87 * Completes without blocking and defers further operations on the socket 88 * until the connection is established. If connecting fails, this is 89 * currently not indicated in any way other than every call to read/write 90 * failing. 91 */ 92 override void open() { 93 if (isOpen) return; 94 95 enforce(!host_.empty, new TTransportException( 96 "Cannot open null host.", TTransportException.Type.NOT_OPEN)); 97 enforce(port_ != 0, new TTransportException( 98 "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); 99 100 101 // Cannot use std.socket.Socket.connect here because it hides away 102 // EINPROGRESS/WSAWOULDBLOCK. 103 Address addr; 104 try { 105 // Currently, we just go with the first address returned, could be made 106 // more intelligent though – IPv6? 107 addr = getAddress(host_, port_)[0]; 108 } catch (Exception e) { 109 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, 110 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 111 } 112 113 socket_ = new TcpSocket(addr.addressFamily); 114 socket_.blocking = false; 115 setSocketOpts(); 116 117 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); 118 if (errorCode == 0) { 119 // If the connection could be established immediately, just return. I 120 // don't know if this ever happens. 121 return; 122 } 123 124 auto errno = getSocketErrno(); 125 if (errno != CONNECT_INPROGRESS_ERRNO) { 126 throw new TTransportException(`Could not establish connection to "` ~ 127 host_ ~ `": ` ~ socketErrnoString(errno), 128 TTransportException.Type.NOT_OPEN); 129 } 130 131 // This is the expected case: connect() signalled that the connection 132 // is being established in the background. Queue up a work item with the 133 // async manager which just defers any other operations on this 134 // TAsyncSocket instance until the socket is ready. 135 asyncManager_.execute(this, 136 { 137 auto fiber = Fiber.getThis(); 138 TAsyncEventReason reason = void; 139 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, 140 connectTimeout, 141 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) 142 ); 143 Fiber.yield(); 144 145 if (reason == TAsyncEventReason.TIMED_OUT) { 146 // Close the connection, so that subsequent work items fail immediately. 147 closeImmediately(); 148 return; 149 } 150 151 int errorCode = void; 152 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, 153 errorCode); 154 155 if (errorCode) { 156 logInfo("Could not connect TAsyncSocket: %s", 157 socketErrnoString(errorCode)); 158 159 // Close the connection, so that subsequent work items fail immediately. 160 closeImmediately(); 161 return; 162 } 163 164 } 165 ); 166 } 167 168 /** 169 * Closes the socket. 170 * 171 * Will block until all currently active operations are finished before the 172 * socket is closed. 173 */ 174 override void close() { 175 if (!isOpen) return; 176 177 import core.sync.condition; 178 import core.sync.mutex; 179 180 auto doneMutex = new Mutex; 181 auto doneCond = new Condition(doneMutex); 182 synchronized (doneMutex) { 183 asyncManager_.execute(this, 184 scopedDelegate( 185 { 186 closeImmediately(); 187 synchronized (doneMutex) doneCond.notifyAll(); 188 } 189 ) 190 ); 191 doneCond.wait(); 192 } 193 } 194 195 override bool peek() { 196 if (!isOpen) return false; 197 198 ubyte buf; 199 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); 200 if (r == Socket.ERROR) { 201 auto lastErrno = getSocketErrno(); 202 static if (connresetOnPeerShutdown) { 203 if (lastErrno == ECONNRESET) { 204 closeImmediately(); 205 return false; 206 } 207 } 208 throw new TTransportException("Peeking into socket failed: " ~ 209 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 210 } 211 return (r > 0); 212 } 213 214 override size_t read(ubyte[] buf) { 215 enforce(isOpen, new TTransportException( 216 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); 217 218 typeof(getSocketErrno()) lastErrno; 219 220 auto r = yieldOnBlock(socket_.receive(cast(void[])buf), 221 TAsyncEventType.READ); 222 223 // If recv went fine, immediately return. 224 if (r >= 0) return r; 225 226 // Something went wrong, find out how to handle it. 227 lastErrno = getSocketErrno(); 228 229 static if (connresetOnPeerShutdown) { 230 // See top comment. 231 if (lastErrno == ECONNRESET) { 232 return 0; 233 } 234 } 235 236 throw new TTransportException("Receiving from socket failed: " ~ 237 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 238 } 239 240 override void write(in ubyte[] buf) { 241 size_t sent; 242 while (sent < buf.length) { 243 sent += writeSome(buf[sent .. $]); 244 } 245 assert(sent == buf.length); 246 } 247 248 override size_t writeSome(in ubyte[] buf) { 249 enforce(isOpen, new TTransportException( 250 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); 251 252 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); 253 254 // Everything went well, just return the number of bytes written. 255 if (r > 0) return r; 256 257 // Handle error conditions. 258 if (r < 0) { 259 auto lastErrno = getSocketErrno(); 260 261 auto type = TTransportException.Type.UNKNOWN; 262 if (isSocketCloseErrno(lastErrno)) { 263 type = TTransportException.Type.NOT_OPEN; 264 closeImmediately(); 265 } 266 267 throw new TTransportException("Sending to socket failed: " ~ 268 socketErrnoString(lastErrno), type); 269 } 270 271 // send() should never return 0. 272 throw new TTransportException("Sending to socket failed (0 bytes written).", 273 TTransportException.Type.UNKNOWN); 274 } 275 276 /// The amount of time in which a conncetion must be established before the 277 /// open() call times out. 278 Duration connectTimeout = dur!"seconds"(5); 279 280 private: 281 void closeImmediately() { 282 socket_.close(); 283 socket_ = null; 284 } 285 286 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { 287 while (true) { 288 auto result = call(); 289 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; 290 291 // We got an EAGAIN result, register a callback to return here once some 292 // event happens and yield. 293 294 Duration timeout = void; 295 final switch (eventType) { 296 case TAsyncEventType.READ: 297 timeout = recvTimeout_; 298 break; 299 case TAsyncEventType.WRITE: 300 timeout = sendTimeout_; 301 break; 302 } 303 304 auto fiber = Fiber.getThis(); 305 assert(fiber, "Current fiber null – not running in TAsyncManager?"); 306 TAsyncEventReason eventReason = void; 307 asyncManager_.addOneshotListener(socket_, eventType, timeout, 308 scopedDelegate((TAsyncEventReason reason) { 309 eventReason = reason; 310 fiber.call(); 311 }) 312 ); 313 314 // Yields execution back to the async manager, will return back here once 315 // the above listener is called. 316 Fiber.yield(); 317 318 if (eventReason == TAsyncEventReason.TIMED_OUT) { 319 // If we are cancelling the request due to a timed out operation, the 320 // connection is in an undefined state, because the server could decide 321 // to send the requested data later, or we could have already been half- 322 // way into writing a request. Thus, we close the connection to make any 323 // possibly queued up work items fail immediately. Besides, the server 324 // is not very likely to immediately recover after a socket-level 325 // timeout has expired anyway. 326 closeImmediately(); 327 328 throw new TTransportException("Timed out while waiting for socket " ~ 329 "to get ready to " ~ to!string(eventType) ~ ".", 330 TTransportException.Type.TIMED_OUT); 331 } 332 } 333 } 334 335 /// The TAsyncSocketManager to use for non-blocking I/O. 336 TAsyncSocketManager asyncManager_; 337 } 338 339 private { 340 // std.socket doesn't include SO_ERROR for reasons unknown. 341 version (linux) { 342 enum SO_ERROR = 4; 343 } else version (OSX) { 344 enum SO_ERROR = 0x1007; 345 } else version (FreeBSD) { 346 enum SO_ERROR = 0x1007; 347 } else version (Win32) { 348 import std.c.windows.winsock : SO_ERROR; 349 } else static assert(false, "Don't know SO_ERROR on this platform."); 350 351 // This hack forces a delegate literal to be scoped, even if it is passed to 352 // a function accepting normal delegates as well. DMD likes to allocate the 353 // context on the heap anyway, but it seems to work for LDC. 354 import std.traits : isDelegate; 355 auto scopedDelegate(D)(scope D d) if (isDelegate!D) { 356 return d; 357 } 358 }