1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.transport.socket; 20 21 import core.stdc.errno: ECONNRESET; 22 import core.thread : Thread; 23 import core.time : dur, Duration; 24 import std.array : empty; 25 import std.conv : text, to; 26 import std.exception : enforce; 27 import std.socket; 28 import thrift.base; 29 import thrift.transport.base; 30 import thrift.internal.socket; 31 32 /** 33 * Common parts of a socket TTransport implementation, regardless of how the 34 * actual I/O is performed (sync/async). 35 */ 36 abstract class TSocketBase : TBaseTransport { 37 /** 38 * Constructor that takes an already created, connected (!) socket. 39 * 40 * Params: 41 * socket = Already created, connected socket object. 42 */ 43 this(Socket socket) { 44 socket_ = socket; 45 setSocketOpts(); 46 } 47 48 /** 49 * Creates a new unconnected socket that will connect to the given host 50 * on the given port. 51 * 52 * Params: 53 * host = Remote host. 54 * port = Remote port. 55 */ 56 this(string host, ushort port) { 57 host_ = host; 58 port_ = port; 59 } 60 61 /** 62 * Checks whether the socket is connected. 63 */ 64 override bool isOpen() @property { 65 return socket_ !is null; 66 } 67 68 /** 69 * Writes as much data to the socket as there can be in a single OS call. 70 * 71 * Params: 72 * buf = Data to write. 73 * 74 * Returns: The actual number of bytes written. Never more than buf.length. 75 */ 76 abstract size_t writeSome(in ubyte[] buf) out (written) { 77 // DMD @@BUG@@: Enabling this e.g. fails the contract in the 78 // async_test_server, because buf.length evaluates to 0 here, even though 79 // in the method body it correctly is 27 (equal to the return value). 80 version (none) assert(written <= buf.length, text("Implementation wrote " ~ 81 "more data than requested to?! (", written, " vs. ", buf.length, ")")); 82 } body { 83 assert(0, "DMD bug? – Why would contracts work for interfaces, but not " ~ 84 "for abstract methods? " ~ 85 "(Error: function […] in and out contracts require function body"); 86 } 87 88 /** 89 * Returns the actual address of the peer the socket is connected to. 90 * 91 * In contrast, the host and port properties contain the address used to 92 * establish the connection, and are not updated after the connection. 93 * 94 * The socket must be open when calling this. 95 */ 96 Address getPeerAddress() { 97 enforce(isOpen, new TTransportException("Cannot get peer host for " ~ 98 "closed socket.", TTransportException.Type.NOT_OPEN)); 99 100 if (!peerAddress_) { 101 peerAddress_ = socket_.remoteAddress(); 102 assert(peerAddress_); 103 } 104 105 return peerAddress_; 106 } 107 108 /** 109 * The host the socket is connected to or will connect to. Null if an 110 * already connected socket was used to construct the object. 111 */ 112 string host() const @property { 113 return host_; 114 } 115 116 /** 117 * The port the socket is connected to or will connect to. Zero if an 118 * already connected socket was used to construct the object. 119 */ 120 ushort port() const @property { 121 return port_; 122 } 123 124 /// The socket send timeout. 125 Duration sendTimeout() const @property { 126 return sendTimeout_; 127 } 128 129 /// Ditto 130 void sendTimeout(Duration value) @property { 131 sendTimeout_ = value; 132 } 133 134 /// The socket receiving timeout. Values smaller than 500 ms are not 135 /// supported on Windows. 136 Duration recvTimeout() const @property { 137 return recvTimeout_; 138 } 139 140 /// Ditto 141 void recvTimeout(Duration value) @property { 142 recvTimeout_ = value; 143 } 144 145 /** 146 * Returns the OS handle of the underlying socket. 147 * 148 * Should not usually be used directly, but access to it can be necessary 149 * to interface with C libraries. 150 */ 151 typeof(socket_.handle()) socketHandle() @property { 152 return socket_.handle(); 153 } 154 155 protected: 156 /** 157 * Sets the needed socket options. 158 */ 159 void setSocketOpts() { 160 try { 161 alias SocketOptionLevel.SOCKET lvlSock; 162 Linger l; 163 l.on = 0; 164 l.time = 0; 165 socket_.setOption(lvlSock, SocketOption.LINGER, l); 166 } catch (SocketException e) { 167 logError("Could not set socket option: %s", e); 168 } 169 170 // Just try to disable Nagle's algorithm – this will fail if we are passed 171 // in a non-TCP socket via the Socket-accepting constructor. 172 try { 173 socket_.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true); 174 } catch (SocketException e) {} 175 } 176 177 /// Remote host. 178 string host_; 179 180 /// Remote port. 181 ushort port_; 182 183 /// Timeout for sending. 184 Duration sendTimeout_; 185 186 /// Timeout for receiving. 187 Duration recvTimeout_; 188 189 /// Cached peer address. 190 Address peerAddress_; 191 192 /// Cached peer host name. 193 string peerHost_; 194 195 /// Cached peer port. 196 ushort peerPort_; 197 198 /// Wrapped socket object. 199 Socket socket_; 200 } 201 202 /** 203 * Socket implementation of the TTransport interface. 204 * 205 * Due to the limitations of std.socket, currently only TCP/IP sockets are 206 * supported (i.e. Unix domain sockets are not). 207 */ 208 class TSocket : TSocketBase { 209 /// 210 this(Socket socket) { 211 super(socket); 212 } 213 214 /// 215 this(string host, ushort port) { 216 super(host, port); 217 } 218 219 /** 220 * Connects the socket. 221 */ 222 override void open() { 223 if (isOpen) return; 224 225 enforce(!host_.empty, new TTransportException( 226 "Cannot open socket to null host.", TTransportException.Type.NOT_OPEN)); 227 enforce(port_ != 0, new TTransportException( 228 "Cannot open socket to port zero.", TTransportException.Type.NOT_OPEN)); 229 230 Address[] addrs; 231 try { 232 addrs = getAddress(host_, port_); 233 } catch (SocketException e) { 234 throw new TTransportException("Could not resolve given host string.", 235 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 236 } 237 238 Exception[] errors; 239 foreach (addr; addrs) { 240 try { 241 socket_ = new TcpSocket(addr.addressFamily); 242 setSocketOpts(); 243 socket_.connect(addr); 244 break; 245 } catch (SocketException e) { 246 errors ~= e; 247 } 248 } 249 if (errors.length == addrs.length) { 250 socket_ = null; 251 // Need to throw a TTransportException to abide the TTransport API. 252 import std.algorithm, std.range; 253 throw new TTransportException( 254 text("Failed to connect to ", host_, ":", port_, "."), 255 TTransportException.Type.NOT_OPEN, 256 __FILE__, __LINE__, 257 new TCompoundOperationException( 258 text( 259 "All addresses tried failed (", 260 joiner(map!q{text(a[0], `: "`, a[1].msg, `"`)}(zip(addrs, errors)), ", "), 261 ")." 262 ), 263 errors 264 ) 265 ); 266 } 267 } 268 269 /** 270 * Closes the socket. 271 */ 272 override void close() { 273 if (!isOpen) return; 274 275 socket_.close(); 276 socket_ = null; 277 } 278 279 override bool peek() { 280 if (!isOpen) return false; 281 282 ubyte buf; 283 auto r = socket_.receive((&buf)[0 .. 1], SocketFlags.PEEK); 284 if (r == -1) { 285 auto lastErrno = getSocketErrno(); 286 static if (connresetOnPeerShutdown) { 287 if (lastErrno == ECONNRESET) { 288 close(); 289 return false; 290 } 291 } 292 throw new TTransportException("Peeking into socket failed: " ~ 293 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 294 } 295 return (r > 0); 296 } 297 298 override size_t read(ubyte[] buf) { 299 enforce(isOpen, new TTransportException( 300 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); 301 302 typeof(getSocketErrno()) lastErrno; 303 ushort tries; 304 while (tries++ <= maxRecvRetries_) { 305 auto r = socket_.receive(cast(void[])buf); 306 307 // If recv went fine, immediately return. 308 if (r >= 0) return r; 309 310 // Something went wrong, find out how to handle it. 311 lastErrno = getSocketErrno(); 312 313 if (lastErrno == INTERRUPTED_ERRNO) { 314 // If the syscall was interrupted, just try again. 315 continue; 316 } 317 318 static if (connresetOnPeerShutdown) { 319 // See top comment. 320 if (lastErrno == ECONNRESET) { 321 return 0; 322 } 323 } 324 325 // Not an error which is handled in a special way, just leave the loop. 326 break; 327 } 328 329 if (isSocketCloseErrno(lastErrno)) { 330 close(); 331 throw new TTransportException("Receiving failed, closing socket: " ~ 332 socketErrnoString(lastErrno), TTransportException.Type.NOT_OPEN); 333 } else if (lastErrno == TIMEOUT_ERRNO) { 334 throw new TTransportException(TTransportException.Type.TIMED_OUT); 335 } else { 336 throw new TTransportException("Receiving from socket failed: " ~ 337 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); 338 } 339 } 340 341 override void write(in ubyte[] buf) { 342 size_t sent; 343 while (sent < buf.length) { 344 auto b = writeSome(buf[sent .. $]); 345 if (b == 0) { 346 // This should only happen if the timeout set with SO_SNDTIMEO expired. 347 throw new TTransportException("send() timeout expired.", 348 TTransportException.Type.TIMED_OUT); 349 } 350 sent += b; 351 } 352 assert(sent == buf.length); 353 } 354 355 override size_t writeSome(in ubyte[] buf) { 356 enforce(isOpen, new TTransportException( 357 "Cannot write if file is not open.", TTransportException.Type.NOT_OPEN)); 358 359 auto r = socket_.send(buf); 360 361 // Everything went well, just return the number of bytes written. 362 if (r > 0) return r; 363 364 // Handle error conditions. 365 if (r < 0) { 366 auto lastErrno = getSocketErrno(); 367 368 if (lastErrno == WOULD_BLOCK_ERRNO) { 369 // Not an exceptional error per se – even with blocking sockets, 370 // EAGAIN apparently is returned sometimes on out-of-resource 371 // conditions (see the C++ implementation for details). Also, this 372 // allows using TSocket with non-blocking sockets e.g. in 373 // TNonblockingServer. 374 return 0; 375 } 376 377 auto type = TTransportException.Type.UNKNOWN; 378 if (isSocketCloseErrno(lastErrno)) { 379 type = TTransportException.Type.NOT_OPEN; 380 close(); 381 } 382 383 throw new TTransportException("Sending to socket failed: " ~ 384 socketErrnoString(lastErrno), type); 385 } 386 387 // send() should never return 0. 388 throw new TTransportException("Sending to socket failed (0 bytes written).", 389 TTransportException.Type.UNKNOWN); 390 } 391 392 override void sendTimeout(Duration value) @property { 393 super.sendTimeout(value); 394 setTimeout(SocketOption.SNDTIMEO, value); 395 } 396 397 override void recvTimeout(Duration value) @property { 398 super.recvTimeout(value); 399 setTimeout(SocketOption.RCVTIMEO, value); 400 } 401 402 /** 403 * Maximum number of retries for receiving from socket on read() in case of 404 * EAGAIN/EINTR. 405 */ 406 ushort maxRecvRetries() @property const { 407 return maxRecvRetries_; 408 } 409 410 /// Ditto 411 void maxRecvRetries(ushort value) @property { 412 maxRecvRetries_ = value; 413 } 414 415 /// Ditto 416 enum DEFAULT_MAX_RECV_RETRIES = 5; 417 418 protected: 419 override void setSocketOpts() { 420 super.setSocketOpts(); 421 setTimeout(SocketOption.SNDTIMEO, sendTimeout_); 422 setTimeout(SocketOption.RCVTIMEO, recvTimeout_); 423 } 424 425 void setTimeout(SocketOption type, Duration value) { 426 assert(type == SocketOption.SNDTIMEO || type == SocketOption.RCVTIMEO); 427 version (Win32) { 428 if (value > dur!"hnsecs"(0) && value < dur!"msecs"(500)) { 429 logError( 430 "Socket %s timeout of %s ms might be raised to 500 ms on Windows.", 431 (type == SocketOption.SNDTIMEO) ? "send" : "receive", 432 value.total!"msecs" 433 ); 434 } 435 } 436 437 if (socket_) { 438 try { 439 socket_.setOption(SocketOptionLevel.SOCKET, type, value); 440 } catch (SocketException e) { 441 throw new TTransportException( 442 "Could not set timeout.", 443 TTransportException.Type.UNKNOWN, 444 __FILE__, 445 __LINE__, 446 e 447 ); 448 } 449 } 450 } 451 452 /// Maximum number of recv() retries. 453 ushort maxRecvRetries_ = DEFAULT_MAX_RECV_RETRIES; 454 }