1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.async.socket; 20 21 import core.thread : Fiber; 22 import core.time : dur, Duration; 23 import std.array : empty; 24 import std.conv : to; 25 import std.exception : enforce; 26 import std.socket; 27 import thrift.base; 28 import thrift.async.base; 29 import thrift.transport.base; 30 import thrift.transport.socket : TSocketBase; 31 import thrift.internal.endian; 32 import thrift.internal.socket; 33 34 version (Windows) { 35 import core.sys.windows.winsock2 : connect; 36 } else version (Posix) { 37 import core.sys.posix.sys.socket : connect; 38 } else static assert(0, "Don't know connect on this platform."); 39 40 version (OSX) { 41 import core.stdc.errno : ECONNRESET; 42 } 43 44 version (Win32) { 45 import core.stdc.config : __c_long; 46 } 47 48 /** 49 * Non-blocking socket implementation of the TTransport interface. 50 * 51 * Whenever a socket operation would block, TAsyncSocket registers a callback 52 * with the specified TAsyncSocketManager and yields. 53 * 54 * As for thrift.transport.socket, due to the limitations of std.socket, 55 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are 56 * not). 57 */ 58 class TAsyncSocket : TSocketBase, TAsyncTransport { 59 /** 60 * Constructor that takes an already created, connected (!) socket. 61 * 62 * Params: 63 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 64 * socket = Already created, connected socket object. Will be switched to 65 * non-blocking mode if it isn't already. 66 */ 67 this(TAsyncSocketManager asyncManager, Socket socket) { 68 asyncManager_ = asyncManager; 69 socket.blocking = false; 70 super(socket); 71 } 72 73 /** 74 * Creates a new unconnected socket that will connect to the given host 75 * on the given port. 76 * 77 * Params: 78 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. 79 * host = Remote host. 80 * port = Remote port. 81 */ 82 this(TAsyncSocketManager asyncManager, string host, ushort port) { 83 asyncManager_ = asyncManager; 84 super(host, port); 85 } 86 87 override TAsyncManager asyncManager() @property { 88 return asyncManager_; 89 } 90 91 /** 92 * Asynchronously connects the socket. 93 * 94 * Completes without blocking and defers further operations on the socket 95 * until the connection is established. If connecting fails, this is 96 * currently not indicated in any way other than every call to read/write 97 * failing. 98 */ 99 override void open() { 100 if (isOpen) return; 101 102 enforce(!host_.empty, new TTransportException( 103 "Cannot open null host.", TTransportException.Type.NOT_OPEN)); 104 enforce(port_ != 0, new TTransportException( 105 "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); 106 107 108 // Cannot use std.socket.Socket.connect here because it hides away 109 // EINPROGRESS/WSAWOULDBLOCK. 110 Address addr; 111 try { 112 // Currently, we just go with the first address returned, could be made 113 // more intelligent though – IPv6? 114 addr = getAddress(host_, port_)[0]; 115 } catch (Exception e) { 116 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, 117 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 118 } 119 120 socket_ = new TcpSocket(addr.addressFamily); 121 socket_.blocking = false; 122 setSocketOpts(); 123 124 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); 125 if (errorCode == 0) { 126 // If the connection could be established immediately, just return. I 127 // don't know if this ever happens. 128 return; 129 } 130 131 auto errno = getSocketErrno(); 132 if (errno != CONNECT_INPROGRESS_ERRNO) { 133 throw new TTransportException(`Could not establish connection to "` ~ 134 host_ ~ `": ` ~ socketErrnoString(errno), 135 TTransportException.Type.NOT_OPEN); 136 } 137 138 // This is the expected case: connect() signalled that the connection 139 // is being established in the background. Queue up a work item with the 140 // async manager which just defers any other operations on this 141 // TAsyncSocket instance until the socket is ready. 142 asyncManager_.execute(this, 143 { 144 auto fiber = Fiber.getThis(); 145 TAsyncEventReason reason = void; 146 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, 147 connectTimeout, 148 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) 149 ); 150 Fiber.yield(); 151 152 if (reason == TAsyncEventReason.TIMED_OUT) { 153 // Close the connection, so that subsequent work items fail immediately. 154 closeImmediately(); 155 return; 156 } 157 158 version (Win32) { 159 __c_long errorCode = void; 160 } else { 161 int errorCode = void; 162 } 163 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, 164 errorCode); 165 166 if (errorCode) { 167 logInfo("Could not connect TAsyncSocket: %s", 168 socketErrnoString(errorCode)); 169 170 // Close the connection, so that subsequent work items fail immediately. 171 closeImmediately(); 172 return; 173 } 174 175 } 176 ); 177 } 178 179 /** 180 * Closes the socket. 181 * 182 * Will block until all currently active operations are finished before the 183 * socket is closed. 184 */ 185 override void close() { 186 if (!isOpen) return; 187 188 import core.sync.condition; 189 import core.sync.mutex; 190 191 auto doneMutex = new Mutex; 192 auto doneCond = new Condition(doneMutex); 193 synchronized (doneMutex) { 194 asyncManager_.execute(this, 195 scopedDelegate( 196 { 197 closeImmediately(); 198 synchronized (doneMutex) doneCond.notifyAll(); 199 } 200 ) 201 ); 202 doneCond.wait(); 203 } 204 } 205 206 override bool peek() { 207 if (!isOpen) return false; 208 209 ubyte buf; 210 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); 211 if (r == Socket.ERROR) { 212 auto lastErrno = getSocketErrno(); 213 static if (connresetOnPeerShutdown) { 214 if (lastErrno == ECONNRESET) { 215 closeImmediately(); 216 return false; 217 } 218 } 219 throw new TTransportException("Peeking into socket failed: " ~ 220 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 221 } 222 return (r > 0); 223 } 224 225 override size_t read(ubyte[] buf) { 226 enforce(isOpen, new TTransportException( 227 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); 228 229 typeof(getSocketErrno()) lastErrno; 230 231 auto r = yieldOnBlock(socket_.receive(cast(void[])buf), 232 TAsyncEventType.READ); 233 234 // If recv went fine, immediately return. 235 if (r >= 0) return r; 236 237 // Something went wrong, find out how to handle it. 238 lastErrno = getSocketErrno(); 239 240 static if (connresetOnPeerShutdown) { 241 // See top comment. 242 if (lastErrno == ECONNRESET) { 243 return 0; 244 } 245 } 246 247 throw new TTransportException("Receiving from socket failed: " ~ 248 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 249 } 250 251 override void write(in ubyte[] buf) { 252 size_t sent; 253 while (sent < buf.length) { 254 sent += writeSome(buf[sent .. $]); 255 } 256 assert(sent == buf.length); 257 } 258 259 override size_t writeSome(in ubyte[] buf) { 260 enforce(isOpen, new TTransportException( 261 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); 262 263 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); 264 265 // Everything went well, just return the number of bytes written. 266 if (r > 0) return r; 267 268 // Handle error conditions. 269 if (r < 0) { 270 auto lastErrno = getSocketErrno(); 271 272 auto type = TTransportException.Type.UNKNOWN; 273 if (isSocketCloseErrno(lastErrno)) { 274 type = TTransportException.Type.NOT_OPEN; 275 closeImmediately(); 276 } 277 278 throw new TTransportException("Sending to socket failed: " ~ 279 socketErrnoString(lastErrno), type); 280 } 281 282 // send() should never return 0. 283 throw new TTransportException("Sending to socket failed (0 bytes written).", 284 TTransportException.Type.UNKNOWN); 285 } 286 287 /// The amount of time in which a conncetion must be established before the 288 /// open() call times out. 289 Duration connectTimeout = dur!"seconds"(5); 290 291 private: 292 void closeImmediately() { 293 socket_.close(); 294 socket_ = null; 295 } 296 297 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { 298 while (true) { 299 auto result = call(); 300 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; 301 302 // We got an EAGAIN result, register a callback to return here once some 303 // event happens and yield. 304 305 Duration timeout = void; 306 final switch (eventType) { 307 case TAsyncEventType.READ: 308 timeout = recvTimeout_; 309 break; 310 case TAsyncEventType.WRITE: 311 timeout = sendTimeout_; 312 break; 313 } 314 315 auto fiber = Fiber.getThis(); 316 assert(fiber, "Current fiber null – not running in TAsyncManager?"); 317 TAsyncEventReason eventReason = void; 318 asyncManager_.addOneshotListener(socket_, eventType, timeout, 319 scopedDelegate((TAsyncEventReason reason) { 320 eventReason = reason; 321 fiber.call(); 322 }) 323 ); 324 325 // Yields execution back to the async manager, will return back here once 326 // the above listener is called. 327 Fiber.yield(); 328 329 if (eventReason == TAsyncEventReason.TIMED_OUT) { 330 // If we are cancelling the request due to a timed out operation, the 331 // connection is in an undefined state, because the server could decide 332 // to send the requested data later, or we could have already been half- 333 // way into writing a request. Thus, we close the connection to make any 334 // possibly queued up work items fail immediately. Besides, the server 335 // is not very likely to immediately recover after a socket-level 336 // timeout has expired anyway. 337 closeImmediately(); 338 339 throw new TTransportException("Timed out while waiting for socket " ~ 340 "to get ready to " ~ to!string(eventType) ~ ".", 341 TTransportException.Type.TIMED_OUT); 342 } 343 } 344 } 345 346 /// The TAsyncSocketManager to use for non-blocking I/O. 347 TAsyncSocketManager asyncManager_; 348 } 349 350 private { 351 // std.socket doesn't include SO_ERROR for reasons unknown. 352 version (linux) { 353 enum SO_ERROR = 4; 354 } else version (OSX) { 355 enum SO_ERROR = 0x1007; 356 } else version (FreeBSD) { 357 enum SO_ERROR = 0x1007; 358 } else version (Windows) { 359 import core.sys.windows.winsock2 : SO_ERROR; 360 } else static assert(false, "Don't know SO_ERROR on this platform."); 361 362 // This hack forces a delegate literal to be scoped, even if it is passed to 363 // a function accepting normal delegates as well. DMD likes to allocate the 364 // context on the heap anyway, but it seems to work for LDC. 365 import std.traits : isDelegate; 366 auto scopedDelegate(D)(scope D d) if (isDelegate!D) { 367 return d; 368 } 369 }