1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.transport.socket;
21 import core.thread : Thread;
22 import core.time : dur, Duration;
23 import std.array : empty;
24 import std.conv : text, to;
25 import std.exception : enforce;
26 import std.socket;
27 import thrift.base;
28 import thrift.transport.base;
29 import thrift.internal.socket;
31 version (Windows) {
32   import core.sys.windows.winsock2 : WSAECONNRESET;
34 } else version (Posix) {
35   import core.stdc.errno : ECONNRESET;
36 } else static assert(0, "Don't know ECONNRESET on this platform.");
38 /**
39  * Common parts of a socket TTransport implementation, regardless of how the
40  * actual I/O is performed (sync/async).
41  */
42 abstract class TSocketBase : TBaseTransport {
43   /**
44    * Constructor that takes an already created, connected (!) socket.
45    *
46    * Params:
47    *   socket = Already created, connected socket object.
48    */
49   this(Socket socket) {
50     socket_ = socket;
51     setSocketOpts();
52   }
54   /**
55    * Creates a new unconnected socket that will connect to the given host
56    * on the given port.
57    *
58    * Params:
59    *   host = Remote host.
60    *   port = Remote port.
61    */
62   this(string host, ushort port) {
63     host_ = host;
64     port_ = port;
65   }
67   /**
68    * Checks whether the socket is connected.
69    */
70   override bool isOpen() @property {
71     return socket_ !is null;
72   }
74   /**
75    * Writes as much data to the socket as there can be in a single OS call.
76    *
77    * Params:
78    *   buf = Data to write.
79    *
80    * Returns: The actual number of bytes written. Never more than buf.length.
81    */
82   abstract size_t writeSome(in ubyte[] buf) out (written) {
83     // DMD @@BUG@@: Enabling this e.g. fails the contract in the
84     // async_test_server, because buf.length evaluates to 0 here, even though
85     // in the method body it correctly is 27 (equal to the return value).
86     version (none) assert(written <= buf.length, text("Implementation wrote " ~
87       "more data than requested to?! (", written, " vs. ", buf.length, ")"));
88   } body {
89     assert(0, "DMD bug? – Why would contracts work for interfaces, but not " ~
90       "for abstract methods? " ~
91       "(Error: function […] in and out contracts require function body");
92   }
94   /**
95    * Returns the actual address of the peer the socket is connected to.
96    *
97    * In contrast, the host and port properties contain the address used to
98    * establish the connection, and are not updated after the connection.
99    *
100    * The socket must be open when calling this.
101    */
102   Address getPeerAddress() {
103     enforce(isOpen, new TTransportException("Cannot get peer host for " ~
104       "closed socket.", TTransportException.Type.NOT_OPEN));
106     if (!peerAddress_) {
107       peerAddress_ = socket_.remoteAddress();
108       assert(peerAddress_);
109     }
111     return peerAddress_;
112   }
114   /**
115    * The host the socket is connected to or will connect to. Null if an
116    * already connected socket was used to construct the object.
117    */
118   string host() const @property {
119     return host_;
120   }
122   /**
123    * The port the socket is connected to or will connect to. Zero if an
124    * already connected socket was used to construct the object.
125    */
126   ushort port() const @property {
127     return port_;
128   }
130   /// The socket send timeout.
131   Duration sendTimeout() const @property {
132     return sendTimeout_;
133   }
135   /// Ditto
136   void sendTimeout(Duration value) @property {
137     sendTimeout_ = value;
138   }
140   /// The socket receiving timeout. Values smaller than 500 ms are not
141   /// supported on Windows.
142   Duration recvTimeout() const @property {
143     return recvTimeout_;
144   }
146   /// Ditto
147   void recvTimeout(Duration value) @property {
148     recvTimeout_ = value;
149   }
151   /**
152    * Returns the OS handle of the underlying socket.
153    *
154    * Should not usually be used directly, but access to it can be necessary
155    * to interface with C libraries.
156    */
157   typeof(socket_.handle()) socketHandle() @property {
158     return socket_.handle();
159   }
161 protected:
162   /**
163    * Sets the needed socket options.
164    */
165   void setSocketOpts() {
166     try {
167       alias SocketOptionLevel.SOCKET lvlSock;
168       Linger l;
169       l.on = 0;
170       l.time = 0;
171       socket_.setOption(lvlSock, SocketOption.LINGER, l);
172     } catch (SocketException e) {
173       logError("Could not set socket option: %s", e);
174     }
176     // Just try to disable Nagle's algorithm – this will fail if we are passed
177     // in a non-TCP socket via the Socket-accepting constructor.
178     try {
179       socket_.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
180     } catch (SocketException e) {}
181   }
183   /// Remote host.
184   string host_;
186   /// Remote port.
187   ushort port_;
189   /// Timeout for sending.
190   Duration sendTimeout_;
192   /// Timeout for receiving.
193   Duration recvTimeout_;
195   /// Cached peer address.
196   Address peerAddress_;
198   /// Cached peer host name.
199   string peerHost_;
201   /// Cached peer port.
202   ushort peerPort_;
204   /// Wrapped socket object.
205   Socket socket_;
206 }
208 /**
209  * Socket implementation of the TTransport interface.
210  *
211  * Due to the limitations of std.socket, currently only TCP/IP sockets are
212  * supported (i.e. Unix domain sockets are not).
213  */
214 class TSocket : TSocketBase {
215   ///
216   this(Socket socket) {
217     super(socket);
218   }
220   ///
221   this(string host, ushort port) {
222     super(host, port);
223   }
225   /**
226    * Connects the socket.
227    */
228   override void open() {
229     if (isOpen) return;
231     enforce(!host_.empty, new TTransportException(
232       "Cannot open socket to null host.", TTransportException.Type.NOT_OPEN));
233     enforce(port_ != 0, new TTransportException(
234       "Cannot open socket to port zero.", TTransportException.Type.NOT_OPEN));
236     Address[] addrs;
237     try {
238       addrs = getAddress(host_, port_);
239     } catch (SocketException e) {
240       throw new TTransportException("Could not resolve given host string.",
241         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
242     }
244     Exception[] errors;
245     foreach (addr; addrs) {
246       try {
247         socket_ = new TcpSocket(addr.addressFamily);
248         setSocketOpts();
249         socket_.connect(addr);
250         break;
251       } catch (SocketException e) {
252         errors ~= e;
253       }
254     }
255     if (errors.length == addrs.length) {
256       socket_ = null;
257       // Need to throw a TTransportException to abide the TTransport API.
258       import std.algorithm, std.range;
259       throw new TTransportException(
260         text("Failed to connect to ", host_, ":", port_, "."),
261         TTransportException.Type.NOT_OPEN,
262         __FILE__, __LINE__,
263         new TCompoundOperationException(
264           text(
265             "All addresses tried failed (",
266             joiner(map!q{text(a[0], `: "`, a[1].msg, `"`)}(zip(addrs, errors)), ", "),
267             ")."
268           ),
269           errors
270         )
271       );
272     }
273   }
275   /**
276    * Closes the socket.
277    */
278   override void close() {
279     if (!isOpen) return;
281     socket_.close();
282     socket_ = null;
283   }
285   override bool peek() {
286     if (!isOpen) return false;
288     ubyte buf;
289     auto r = socket_.receive((&buf)[0 .. 1], SocketFlags.PEEK);
290     if (r == -1) {
291       auto lastErrno = getSocketErrno();
292       static if (connresetOnPeerShutdown) {
293         if (lastErrno == ECONNRESET) {
294           close();
295           return false;
296         }
297       }
298       throw new TTransportException("Peeking into socket failed: " ~
299         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
300     }
301     return (r > 0);
302   }
304   override size_t read(ubyte[] buf) {
305     enforce(isOpen, new TTransportException(
306       "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
308     typeof(getSocketErrno()) lastErrno;
309     ushort tries;
310     while (tries++ <= maxRecvRetries_) {
311       auto r = socket_.receive(cast(void[])buf);
313       // If recv went fine, immediately return.
314       if (r >= 0) return r;
316       // Something went wrong, find out how to handle it.
317       lastErrno = getSocketErrno();
319       if (lastErrno == INTERRUPTED_ERRNO) {
320         // If the syscall was interrupted, just try again.
321         continue;
322       }
324       static if (connresetOnPeerShutdown) {
325         // See top comment.
326         if (lastErrno == ECONNRESET) {
327           return 0;
328         }
329       }
331       // Not an error which is handled in a special way, just leave the loop.
332       break;
333     }
335     if (isSocketCloseErrno(lastErrno)) {
336       close();
337       throw new TTransportException("Receiving failed, closing socket: " ~
338         socketErrnoString(lastErrno), TTransportException.Type.NOT_OPEN);
339     } else if (lastErrno == TIMEOUT_ERRNO) {
340       throw new TTransportException(TTransportException.Type.TIMED_OUT);
341     } else {
342       throw new TTransportException("Receiving from socket failed: " ~
343         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
344     }
345   }
347   override void write(in ubyte[] buf) {
348     size_t sent;
349     while (sent < buf.length) {
350       auto b = writeSome(buf[sent .. $]);
351       if (b == 0) {
352         // This should only happen if the timeout set with SO_SNDTIMEO expired.
353         throw new TTransportException("send() timeout expired.",
354           TTransportException.Type.TIMED_OUT);
355       }
356       sent += b;
357     }
358     assert(sent == buf.length);
359   }
361   override size_t writeSome(in ubyte[] buf) {
362     enforce(isOpen, new TTransportException(
363       "Cannot write if file is not open.", TTransportException.Type.NOT_OPEN));
365     auto r = socket_.send(buf);
367     // Everything went well, just return the number of bytes written.
368     if (r > 0) return r;
370     // Handle error conditions.
371     if (r < 0) {
372       auto lastErrno = getSocketErrno();
374       if (lastErrno == WOULD_BLOCK_ERRNO) {
375         // Not an exceptional error per se – even with blocking sockets,
376         // EAGAIN apparently is returned sometimes on out-of-resource
377         // conditions (see the C++ implementation for details). Also, this
378         // allows using TSocket with non-blocking sockets e.g. in
379         // TNonblockingServer.
380         return 0;
381       }
383       auto type = TTransportException.Type.UNKNOWN;
384       if (isSocketCloseErrno(lastErrno)) {
385         type = TTransportException.Type.NOT_OPEN;
386         close();
387       }
389       throw new TTransportException("Sending to socket failed: " ~
390         socketErrnoString(lastErrno), type);
391     }
393     // send() should never return 0.
394     throw new TTransportException("Sending to socket failed (0 bytes written).",
395       TTransportException.Type.UNKNOWN);
396   }
398   override void sendTimeout(Duration value) @property {
399     super.sendTimeout(value);
400     setTimeout(SocketOption.SNDTIMEO, value);
401   }
403   override void recvTimeout(Duration value) @property {
404     super.recvTimeout(value);
405     setTimeout(SocketOption.RCVTIMEO, value);
406   }
408   /**
409    * Maximum number of retries for receiving from socket on read() in case of
410    * EAGAIN/EINTR.
411    */
412   ushort maxRecvRetries() @property const {
413     return maxRecvRetries_;
414   }
416   /// Ditto
417   void maxRecvRetries(ushort value) @property {
418     maxRecvRetries_ = value;
419   }
421   /// Ditto
424 protected:
425   override void setSocketOpts() {
426     super.setSocketOpts();
427     setTimeout(SocketOption.SNDTIMEO, sendTimeout_);
428     setTimeout(SocketOption.RCVTIMEO, recvTimeout_);
429   }
431   void setTimeout(SocketOption type, Duration value) {
432     assert(type == SocketOption.SNDTIMEO || type == SocketOption.RCVTIMEO);
433     version (Win32) {
434       if (value > dur!"hnsecs"(0) && value < dur!"msecs"(500)) {
435         logError(
436           "Socket %s timeout of %s ms might be raised to 500 ms on Windows.",
437           (type == SocketOption.SNDTIMEO) ? "send" : "receive",
438           value.total!"msecs"
439         );
440       }
441     }
443     if (socket_) {
444       try {
445         socket_.setOption(SocketOptionLevel.SOCKET, type, value);
446       } catch (SocketException e) {
447         throw new TTransportException(
448           "Could not set timeout.",
449           TTransportException.Type.UNKNOWN,
450           __FILE__,
451           __LINE__,
452           e
453         );
454       }
455     }
456   }
458   /// Maximum number of recv() retries.
459   ushort maxRecvRetries_  = DEFAULT_MAX_RECV_RETRIES;
460 }