1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.async.socket;
20 
21 import core.thread : Fiber;
22 import core.time : dur, Duration;
23 import std.array : empty;
24 import std.conv : to;
25 import std.exception : enforce;
26 import std.socket;
27 import thrift.base;
28 import thrift.async.base;
29 import thrift.transport.base;
30 import thrift.transport.socket : TSocketBase;
31 import thrift.internal.endian;
32 import thrift.internal.socket;
33 
34 version (Windows) {
35   import core.sys.windows.winsock2 : connect;
36 } else version (Posix) {
37   import core.sys.posix.sys.socket : connect;
38 } else static assert(0, "Don't know connect on this platform.");
39 
40 version (OSX) {
41   import core.stdc.errno : ECONNRESET;
42 }
43 
44 version (Win32) {
45   import core.stdc.config : __c_long;
46 }
47 
48 /**
49  * Non-blocking socket implementation of the TTransport interface.
50  *
51  * Whenever a socket operation would block, TAsyncSocket registers a callback
52  * with the specified TAsyncSocketManager and yields.
53  *
54  * As for thrift.transport.socket, due to the limitations of std.socket,
55  * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
56  * not).
57  */
58 class TAsyncSocket : TSocketBase, TAsyncTransport {
59   /**
60    * Constructor that takes an already created, connected (!) socket.
61    *
62    * Params:
63    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
64    *   socket = Already created, connected socket object. Will be switched to
65    *     non-blocking mode if it isn't already.
66    */
67   this(TAsyncSocketManager asyncManager, Socket socket) {
68     asyncManager_ = asyncManager;
69     socket.blocking = false;
70     super(socket);
71   }
72 
73   /**
74    * Creates a new unconnected socket that will connect to the given host
75    * on the given port.
76    *
77    * Params:
78    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
79    *   host = Remote host.
80    *   port = Remote port.
81    */
82   this(TAsyncSocketManager asyncManager, string host, ushort port) {
83     asyncManager_ = asyncManager;
84     super(host, port);
85   }
86 
87   override TAsyncManager asyncManager() @property {
88     return asyncManager_;
89   }
90 
91   /**
92    * Asynchronously connects the socket.
93    *
94    * Completes without blocking and defers further operations on the socket
95    * until the connection is established. If connecting fails, this is
96    * currently not indicated in any way other than every call to read/write
97    * failing.
98    */
99   override void open() {
100     if (isOpen) return;
101 
102     enforce(!host_.empty, new TTransportException(
103       "Cannot open null host.", TTransportException.Type.NOT_OPEN));
104     enforce(port_ != 0, new TTransportException(
105       "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
106 
107 
108     // Cannot use std.socket.Socket.connect here because it hides away
109     // EINPROGRESS/WSAWOULDBLOCK.
110     Address addr;
111     try {
112       // Currently, we just go with the first address returned, could be made
113       // more intelligent though – IPv6?
114       addr = getAddress(host_, port_)[0];
115     } catch (Exception e) {
116       throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
117         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
118     }
119 
120     socket_ = new TcpSocket(addr.addressFamily);
121     socket_.blocking = false;
122     setSocketOpts();
123 
124     auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
125     if (errorCode == 0) {
126       // If the connection could be established immediately, just return. I
127       // don't know if this ever happens.
128       return;
129     }
130 
131     auto errno = getSocketErrno();
132     if (errno != CONNECT_INPROGRESS_ERRNO) {
133       throw new TTransportException(`Could not establish connection to "` ~
134         host_ ~ `": ` ~ socketErrnoString(errno),
135         TTransportException.Type.NOT_OPEN);
136     }
137 
138     // This is the expected case: connect() signalled that the connection
139     // is being established in the background. Queue up a work item with the
140     // async manager which just defers any other operations on this
141     // TAsyncSocket instance until the socket is ready.
142     asyncManager_.execute(this,
143       {
144         auto fiber = Fiber.getThis();
145         TAsyncEventReason reason = void;
146         asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
147           connectTimeout,
148           scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
149         );
150         Fiber.yield();
151 
152         if (reason == TAsyncEventReason.TIMED_OUT) {
153           // Close the connection, so that subsequent work items fail immediately.
154           closeImmediately();
155           return;
156         }
157 
158         version (Win32) {
159           __c_long errorCode = void;
160         } else {
161           int errorCode = void;
162         }
163         socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
164           errorCode);
165 
166         if (errorCode) {
167           logInfo("Could not connect TAsyncSocket: %s",
168             socketErrnoString(errorCode));
169 
170           // Close the connection, so that subsequent work items fail immediately.
171           closeImmediately();
172           return;
173         }
174 
175       }
176     );
177   }
178 
179   /**
180    * Closes the socket.
181    *
182    * Will block until all currently active operations are finished before the
183    * socket is closed.
184    */
185   override void close() {
186     if (!isOpen) return;
187 
188     import core.sync.condition;
189     import core.sync.mutex;
190 
191     auto doneMutex = new Mutex;
192     auto doneCond = new Condition(doneMutex);
193     synchronized (doneMutex) {
194       asyncManager_.execute(this,
195         scopedDelegate(
196           {
197             closeImmediately();
198             synchronized (doneMutex) doneCond.notifyAll();
199           }
200         )
201       );
202       doneCond.wait();
203     }
204   }
205 
206   override bool peek() {
207     if (!isOpen) return false;
208 
209     ubyte buf;
210     auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
211     if (r == Socket.ERROR) {
212       auto lastErrno = getSocketErrno();
213       static if (connresetOnPeerShutdown) {
214         if (lastErrno == ECONNRESET) {
215           closeImmediately();
216           return false;
217         }
218       }
219       throw new TTransportException("Peeking into socket failed: " ~
220         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
221     }
222     return (r > 0);
223   }
224 
225   override size_t read(ubyte[] buf) {
226     enforce(isOpen, new TTransportException(
227       "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
228 
229     typeof(getSocketErrno()) lastErrno;
230 
231     auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
232       TAsyncEventType.READ);
233 
234     // If recv went fine, immediately return.
235     if (r >= 0) return r;
236 
237     // Something went wrong, find out how to handle it.
238     lastErrno = getSocketErrno();
239 
240     static if (connresetOnPeerShutdown) {
241       // See top comment.
242       if (lastErrno == ECONNRESET) {
243         return 0;
244       }
245     }
246 
247     throw new TTransportException("Receiving from socket failed: " ~
248       socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
249   }
250 
251   override void write(in ubyte[] buf) {
252     size_t sent;
253     while (sent < buf.length) {
254       sent += writeSome(buf[sent .. $]);
255     }
256     assert(sent == buf.length);
257   }
258 
259   override size_t writeSome(in ubyte[] buf) {
260     enforce(isOpen, new TTransportException(
261       "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
262 
263     auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
264 
265     // Everything went well, just return the number of bytes written.
266     if (r > 0) return r;
267 
268     // Handle error conditions.
269     if (r < 0) {
270       auto lastErrno = getSocketErrno();
271 
272       auto type = TTransportException.Type.UNKNOWN;
273       if (isSocketCloseErrno(lastErrno)) {
274         type = TTransportException.Type.NOT_OPEN;
275         closeImmediately();
276       }
277 
278       throw new TTransportException("Sending to socket failed: " ~
279         socketErrnoString(lastErrno), type);
280     }
281 
282     // send() should never return 0.
283     throw new TTransportException("Sending to socket failed (0 bytes written).",
284       TTransportException.Type.UNKNOWN);
285   }
286 
287   /// The amount of time in which a conncetion must be established before the
288   /// open() call times out.
289   Duration connectTimeout = dur!"seconds"(5);
290 
291 private:
292   void closeImmediately() {
293     socket_.close();
294     socket_ = null;
295   }
296 
297   T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
298     while (true) {
299       auto result = call();
300       if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
301 
302       // We got an EAGAIN result, register a callback to return here once some
303       // event happens and yield.
304 
305       Duration timeout = void;
306       final switch (eventType) {
307         case TAsyncEventType.READ:
308           timeout = recvTimeout_;
309           break;
310         case TAsyncEventType.WRITE:
311           timeout = sendTimeout_;
312           break;
313       }
314 
315       auto fiber = Fiber.getThis();
316       assert(fiber, "Current fiber null – not running in TAsyncManager?");
317       TAsyncEventReason eventReason = void;
318       asyncManager_.addOneshotListener(socket_, eventType, timeout,
319         scopedDelegate((TAsyncEventReason reason) {
320           eventReason = reason;
321           fiber.call();
322         })
323       );
324 
325       // Yields execution back to the async manager, will return back here once
326       // the above listener is called.
327       Fiber.yield();
328 
329       if (eventReason == TAsyncEventReason.TIMED_OUT) {
330         // If we are cancelling the request due to a timed out operation, the
331         // connection is in an undefined state, because the server could decide
332         // to send the requested data later, or we could have already been half-
333         // way into writing a request. Thus, we close the connection to make any
334         // possibly queued up work items fail immediately. Besides, the server
335         // is not very likely to immediately recover after a socket-level
336         // timeout has expired anyway.
337         closeImmediately();
338 
339         throw new TTransportException("Timed out while waiting for socket " ~
340           "to get ready to " ~ to!string(eventType) ~ ".",
341           TTransportException.Type.TIMED_OUT);
342       }
343     }
344   }
345 
346   /// The TAsyncSocketManager to use for non-blocking I/O.
347   TAsyncSocketManager asyncManager_;
348 }
349 
350 private {
351   // std.socket doesn't include SO_ERROR for reasons unknown.
352   version (linux) {
353     enum SO_ERROR = 4;
354   } else version (OSX) {
355     enum SO_ERROR = 0x1007;
356   } else version (FreeBSD) {
357     enum SO_ERROR = 0x1007;
358   } else version (Windows) {
359     import core.sys.windows.winsock2 : SO_ERROR;
360   } else static assert(false, "Don't know SO_ERROR on this platform.");
361 
362   // This hack forces a delegate literal to be scoped, even if it is passed to
363   // a function accepting normal delegates as well. DMD likes to allocate the
364   // context on the heap anyway, but it seems to work for LDC.
365   import std.traits : isDelegate;
366   auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
367     return d;
368   }
369 }