1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.async.socket; 20 21 import core.thread : Fiber; 22 import core.time : dur, Duration; 23 import std.array : empty; 24 import std.conv : to; 25 import std.exception : enforce; 26 import std.socket; 27 import thrift.base; 28 import thrift.async.base; 29 import thrift.transport.base; 30 import thrift.transport.socket : TSocketBase; 31 import thrift.internal.endian; 32 import thrift.internal.socket; 33 34 version (Windows) { 35 import core.sys.windows.winsock2 : connect; 36 } else version (Posix) { 37 import core.sys.posix.sys.socket : connect; 38 } else static assert(0, "Don't know connect on this platform."); 39 40 version (Win32) { 41 import core.stdc.config : __c_long; 42 } 43 44 /** 45 * Non-blocking socket implementation of the TTransport interface. 46 * 47 * Whenever a socket operation would block, TAsyncSocket registers a callback 48 * with the specified TAsyncSocketManager and yields. 49 * 50 * As for thrift.transport.socket, due to the limitations of std.socket, 51 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are 52 * not). 53 */ 54 class TAsyncSocket : TSocketBase, TAsyncTransport { 55 /** 56 * Constructor that takes an already created, connected (!) socket. 57 * 58 * Params: 59 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 60 * socket = Already created, connected socket object. Will be switched to 61 * non-blocking mode if it isn't already. 62 */ 63 this(TAsyncSocketManager asyncManager, Socket socket) { 64 asyncManager_ = asyncManager; 65 socket.blocking = false; 66 super(socket); 67 } 68 69 /** 70 * Creates a new unconnected socket that will connect to the given host 71 * on the given port. 72 * 73 * Params: 74 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 75 * host = Remote host. 76 * port = Remote port. 77 */ 78 this(TAsyncSocketManager asyncManager, string host, ushort port) { 79 asyncManager_ = asyncManager; 80 super(host, port); 81 } 82 83 override TAsyncManager asyncManager() @property { 84 return asyncManager_; 85 } 86 87 /** 88 * Asynchronously connects the socket. 89 * 90 * Completes without blocking and defers further operations on the socket 91 * until the connection is established. If connecting fails, this is 92 * currently not indicated in any way other than every call to read/write 93 * failing. 94 */ 95 override void open() { 96 if (isOpen) return; 97 98 enforce(!host_.empty, new TTransportException( 99 "Cannot open null host.", TTransportException.Type.NOT_OPEN)); 100 enforce(port_ != 0, new TTransportException( 101 "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); 102 103 104 // Cannot use std.socket.Socket.connect here because it hides away 105 // EINPROGRESS/WSAWOULDBLOCK. 106 Address addr; 107 try { 108 // Currently, we just go with the first address returned, could be made 109 // more intelligent though – IPv6? 110 addr = getAddress(host_, port_)[0]; 111 } catch (Exception e) { 112 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, 113 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 114 } 115 116 socket_ = new TcpSocket(addr.addressFamily); 117 socket_.blocking = false; 118 setSocketOpts(); 119 120 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); 121 if (errorCode == 0) { 122 // If the connection could be established immediately, just return. I 123 // don't know if this ever happens. 124 return; 125 } 126 127 auto errno = getSocketErrno(); 128 if (errno != CONNECT_INPROGRESS_ERRNO) { 129 throw new TTransportException(`Could not establish connection to "` ~ 130 host_ ~ `": ` ~ socketErrnoString(errno), 131 TTransportException.Type.NOT_OPEN); 132 } 133 134 // This is the expected case: connect() signalled that the connection 135 // is being established in the background. Queue up a work item with the 136 // async manager which just defers any other operations on this 137 // TAsyncSocket instance until the socket is ready. 138 asyncManager_.execute(this, 139 { 140 auto fiber = Fiber.getThis(); 141 TAsyncEventReason reason = void; 142 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, 143 connectTimeout, 144 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) 145 ); 146 Fiber.yield(); 147 148 if (reason == TAsyncEventReason.TIMED_OUT) { 149 // Close the connection, so that subsequent work items fail immediately. 150 closeImmediately(); 151 return; 152 } 153 154 version (Win32) { 155 __c_long errorCode = void; 156 } else { 157 int errorCode = void; 158 } 159 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, 160 errorCode); 161 162 if (errorCode) { 163 logInfo("Could not connect TAsyncSocket: %s", 164 socketErrnoString(errorCode)); 165 166 // Close the connection, so that subsequent work items fail immediately. 167 closeImmediately(); 168 return; 169 } 170 171 } 172 ); 173 } 174 175 /** 176 * Closes the socket. 177 * 178 * Will block until all currently active operations are finished before the 179 * socket is closed. 180 */ 181 override void close() { 182 if (!isOpen) return; 183 184 import core.sync.condition; 185 import core.sync.mutex; 186 187 auto doneMutex = new Mutex; 188 auto doneCond = new Condition(doneMutex); 189 synchronized (doneMutex) { 190 asyncManager_.execute(this, 191 scopedDelegate( 192 { 193 closeImmediately(); 194 synchronized (doneMutex) doneCond.notifyAll(); 195 } 196 ) 197 ); 198 doneCond.wait(); 199 } 200 } 201 202 override bool peek() { 203 if (!isOpen) return false; 204 205 ubyte buf; 206 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); 207 if (r == Socket.ERROR) { 208 auto lastErrno = getSocketErrno(); 209 static if (connresetOnPeerShutdown) { 210 if (lastErrno == ECONNRESET) { 211 closeImmediately(); 212 return false; 213 } 214 } 215 throw new TTransportException("Peeking into socket failed: " ~ 216 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 217 } 218 return (r > 0); 219 } 220 221 override size_t read(ubyte[] buf) { 222 enforce(isOpen, new TTransportException( 223 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); 224 225 typeof(getSocketErrno()) lastErrno; 226 227 auto r = yieldOnBlock(socket_.receive(cast(void[])buf), 228 TAsyncEventType.READ); 229 230 // If recv went fine, immediately return. 231 if (r >= 0) return r; 232 233 // Something went wrong, find out how to handle it. 234 lastErrno = getSocketErrno(); 235 236 static if (connresetOnPeerShutdown) { 237 // See top comment. 238 if (lastErrno == ECONNRESET) { 239 return 0; 240 } 241 } 242 243 throw new TTransportException("Receiving from socket failed: " ~ 244 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 245 } 246 247 override void write(in ubyte[] buf) { 248 size_t sent; 249 while (sent < buf.length) { 250 sent += writeSome(buf[sent .. $]); 251 } 252 assert(sent == buf.length); 253 } 254 255 override size_t writeSome(in ubyte[] buf) { 256 enforce(isOpen, new TTransportException( 257 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); 258 259 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); 260 261 // Everything went well, just return the number of bytes written. 262 if (r > 0) return r; 263 264 // Handle error conditions. 265 if (r < 0) { 266 auto lastErrno = getSocketErrno(); 267 268 auto type = TTransportException.Type.UNKNOWN; 269 if (isSocketCloseErrno(lastErrno)) { 270 type = TTransportException.Type.NOT_OPEN; 271 closeImmediately(); 272 } 273 274 throw new TTransportException("Sending to socket failed: " ~ 275 socketErrnoString(lastErrno), type); 276 } 277 278 // send() should never return 0. 279 throw new TTransportException("Sending to socket failed (0 bytes written).", 280 TTransportException.Type.UNKNOWN); 281 } 282 283 /// The amount of time in which a conncetion must be established before the 284 /// open() call times out. 285 Duration connectTimeout = dur!"seconds"(5); 286 287 private: 288 void closeImmediately() { 289 socket_.close(); 290 socket_ = null; 291 } 292 293 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { 294 while (true) { 295 auto result = call(); 296 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; 297 298 // We got an EAGAIN result, register a callback to return here once some 299 // event happens and yield. 300 301 Duration timeout = void; 302 final switch (eventType) { 303 case TAsyncEventType.READ: 304 timeout = recvTimeout_; 305 break; 306 case TAsyncEventType.WRITE: 307 timeout = sendTimeout_; 308 break; 309 } 310 311 auto fiber = Fiber.getThis(); 312 assert(fiber, "Current fiber null – not running in TAsyncManager?"); 313 TAsyncEventReason eventReason = void; 314 asyncManager_.addOneshotListener(socket_, eventType, timeout, 315 scopedDelegate((TAsyncEventReason reason) { 316 eventReason = reason; 317 fiber.call(); 318 }) 319 ); 320 321 // Yields execution back to the async manager, will return back here once 322 // the above listener is called. 323 Fiber.yield(); 324 325 if (eventReason == TAsyncEventReason.TIMED_OUT) { 326 // If we are cancelling the request due to a timed out operation, the 327 // connection is in an undefined state, because the server could decide 328 // to send the requested data later, or we could have already been half- 329 // way into writing a request. Thus, we close the connection to make any 330 // possibly queued up work items fail immediately. Besides, the server 331 // is not very likely to immediately recover after a socket-level 332 // timeout has expired anyway. 333 closeImmediately(); 334 335 throw new TTransportException("Timed out while waiting for socket " ~ 336 "to get ready to " ~ to!string(eventType) ~ ".", 337 TTransportException.Type.TIMED_OUT); 338 } 339 } 340 } 341 342 /// The TAsyncSocketManager to use for non-blocking I/O. 343 TAsyncSocketManager asyncManager_; 344 } 345 346 private { 347 // std.socket doesn't include SO_ERROR for reasons unknown. 348 version (linux) { 349 enum SO_ERROR = 4; 350 } else version (OSX) { 351 enum SO_ERROR = 0x1007; 352 } else version (FreeBSD) { 353 enum SO_ERROR = 0x1007; 354 } else version (Windows) { 355 import core.sys.windows.winsock2 : SO_ERROR; 356 } else static assert(false, "Don't know SO_ERROR on this platform."); 357 358 // This hack forces a delegate literal to be scoped, even if it is passed to 359 // a function accepting normal delegates as well. DMD likes to allocate the 360 // context on the heap anyway, but it seems to work for LDC. 361 import std.traits : isDelegate; 362 auto scopedDelegate(D)(scope D d) if (isDelegate!D) { 363 return d; 364 } 365 }