1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.async.socket;
20 
21 import core.stdc.errno: ECONNRESET;
22 import core.thread : Fiber;
23 import core.time : dur, Duration;
24 import std.array : empty;
25 import std.conv : to;
26 import std.exception : enforce;
27 import std.socket;
28 import thrift.base;
29 import thrift.async.base;
30 import thrift.transport.base;
31 import thrift.transport.socket : TSocketBase;
32 import thrift.internal.endian;
33 import thrift.internal.socket;
34 
35 version (Windows) {
36   import std.c.windows.winsock : connect;
37 } else version (Posix) {
38   import core.sys.posix.sys.socket : connect;
39 } else static assert(0, "Don't know connect on this platform.");
40 
41 /**
42  * Non-blocking socket implementation of the TTransport interface.
43  *
44  * Whenever a socket operation would block, TAsyncSocket registers a callback
45  * with the specified TAsyncSocketManager and yields.
46  *
47  * As for thrift.transport.socket, due to the limitations of std.socket,
48  * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
49  * not).
50  */
51 class TAsyncSocket : TSocketBase, TAsyncTransport {
52   /**
53    * Constructor that takes an already created, connected (!) socket.
54    *
55    * Params:
56    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
57    *   socket = Already created, connected socket object. Will be switched to
58    *     non-blocking mode if it isn't already.
59    */
60   this(TAsyncSocketManager asyncManager, Socket socket) {
61     asyncManager_ = asyncManager;
62     socket.blocking = false;
63     super(socket);
64   }
65 
66   /**
67    * Creates a new unconnected socket that will connect to the given host
68    * on the given port.
69    *
70    * Params:
71    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
72    *   host = Remote host.
73    *   port = Remote port.
74    */
75   this(TAsyncSocketManager asyncManager, string host, ushort port) {
76     asyncManager_ = asyncManager;
77     super(host, port);
78   }
79 
80   override TAsyncManager asyncManager() @property {
81     return asyncManager_;
82   }
83 
84   /**
85    * Asynchronously connects the socket.
86    *
87    * Completes without blocking and defers further operations on the socket
88    * until the connection is established. If connecting fails, this is
89    * currently not indicated in any way other than every call to read/write
90    * failing.
91    */
92   override void open() {
93     if (isOpen) return;
94 
95     enforce(!host_.empty, new TTransportException(
96       "Cannot open null host.", TTransportException.Type.NOT_OPEN));
97     enforce(port_ != 0, new TTransportException(
98       "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
99 
100 
101     // Cannot use std.socket.Socket.connect here because it hides away
102     // EINPROGRESS/WSAWOULDBLOCK.
103     Address addr;
104     try {
105       // Currently, we just go with the first address returned, could be made
106       // more intelligent though – IPv6?
107       addr = getAddress(host_, port_)[0];
108     } catch (Exception e) {
109       throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
110         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
111     }
112 
113     socket_ = new TcpSocket(addr.addressFamily);
114     socket_.blocking = false;
115     setSocketOpts();
116 
117     auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
118     if (errorCode == 0) {
119       // If the connection could be established immediately, just return. I
120       // don't know if this ever happens.
121       return;
122     }
123 
124     auto errno = getSocketErrno();
125     if (errno != CONNECT_INPROGRESS_ERRNO) {
126       throw new TTransportException(`Could not establish connection to "` ~
127         host_ ~ `": ` ~ socketErrnoString(errno),
128         TTransportException.Type.NOT_OPEN);
129     }
130 
131     // This is the expected case: connect() signalled that the connection
132     // is being established in the background. Queue up a work item with the
133     // async manager which just defers any other operations on this
134     // TAsyncSocket instance until the socket is ready.
135     asyncManager_.execute(this,
136       {
137         auto fiber = Fiber.getThis();
138         TAsyncEventReason reason = void;
139         asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
140           connectTimeout,
141           scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
142         );
143         Fiber.yield();
144 
145         if (reason == TAsyncEventReason.TIMED_OUT) {
146           // Close the connection, so that subsequent work items fail immediately.
147           closeImmediately();
148           return;
149         }
150 
151         int errorCode = void;
152         socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
153           errorCode);
154 
155         if (errorCode) {
156           logInfo("Could not connect TAsyncSocket: %s",
157             socketErrnoString(errorCode));
158 
159           // Close the connection, so that subsequent work items fail immediately.
160           closeImmediately();
161           return;
162         }
163 
164       }
165     );
166   }
167 
168   /**
169    * Closes the socket.
170    *
171    * Will block until all currently active operations are finished before the
172    * socket is closed.
173    */
174   override void close() {
175     if (!isOpen) return;
176 
177     import core.sync.condition;
178     import core.sync.mutex;
179 
180     auto doneMutex = new Mutex;
181     auto doneCond = new Condition(doneMutex);
182     synchronized (doneMutex) {
183       asyncManager_.execute(this,
184         scopedDelegate(
185           {
186             closeImmediately();
187             synchronized (doneMutex) doneCond.notifyAll();
188           }
189         )
190       );
191       doneCond.wait();
192     }
193   }
194 
195   override bool peek() {
196     if (!isOpen) return false;
197 
198     ubyte buf;
199     auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
200     if (r == Socket.ERROR) {
201       auto lastErrno = getSocketErrno();
202       static if (connresetOnPeerShutdown) {
203         if (lastErrno == ECONNRESET) {
204           closeImmediately();
205           return false;
206         }
207       }
208       throw new TTransportException("Peeking into socket failed: " ~
209         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
210     }
211     return (r > 0);
212   }
213 
214   override size_t read(ubyte[] buf) {
215     enforce(isOpen, new TTransportException(
216       "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
217 
218     typeof(getSocketErrno()) lastErrno;
219 
220     auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
221       TAsyncEventType.READ);
222 
223     // If recv went fine, immediately return.
224     if (r >= 0) return r;
225 
226     // Something went wrong, find out how to handle it.
227     lastErrno = getSocketErrno();
228 
229     static if (connresetOnPeerShutdown) {
230       // See top comment.
231       if (lastErrno == ECONNRESET) {
232         return 0;
233       }
234     }
235 
236     throw new TTransportException("Receiving from socket failed: " ~
237       socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
238   }
239 
240   override void write(in ubyte[] buf) {
241     size_t sent;
242     while (sent < buf.length) {
243       sent += writeSome(buf[sent .. $]);
244     }
245     assert(sent == buf.length);
246   }
247 
248   override size_t writeSome(in ubyte[] buf) {
249     enforce(isOpen, new TTransportException(
250       "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
251 
252     auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
253 
254     // Everything went well, just return the number of bytes written.
255     if (r > 0) return r;
256 
257     // Handle error conditions.
258     if (r < 0) {
259       auto lastErrno = getSocketErrno();
260 
261       auto type = TTransportException.Type.UNKNOWN;
262       if (isSocketCloseErrno(lastErrno)) {
263         type = TTransportException.Type.NOT_OPEN;
264         closeImmediately();
265       }
266 
267       throw new TTransportException("Sending to socket failed: " ~
268         socketErrnoString(lastErrno), type);
269     }
270 
271     // send() should never return 0.
272     throw new TTransportException("Sending to socket failed (0 bytes written).",
273       TTransportException.Type.UNKNOWN);
274   }
275 
276   /// The amount of time in which a conncetion must be established before the
277   /// open() call times out.
278   Duration connectTimeout = dur!"seconds"(5);
279 
280 private:
281   void closeImmediately() {
282     socket_.close();
283     socket_ = null;
284   }
285 
286   T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
287     while (true) {
288       auto result = call();
289       if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
290 
291       // We got an EAGAIN result, register a callback to return here once some
292       // event happens and yield.
293 
294       Duration timeout = void;
295       final switch (eventType) {
296         case TAsyncEventType.READ:
297           timeout = recvTimeout_;
298           break;
299         case TAsyncEventType.WRITE:
300           timeout = sendTimeout_;
301           break;
302       }
303 
304       auto fiber = Fiber.getThis();
305       assert(fiber, "Current fiber null – not running in TAsyncManager?");
306       TAsyncEventReason eventReason = void;
307       asyncManager_.addOneshotListener(socket_, eventType, timeout,
308         scopedDelegate((TAsyncEventReason reason) {
309           eventReason = reason;
310           fiber.call();
311         })
312       );
313 
314       // Yields execution back to the async manager, will return back here once
315       // the above listener is called.
316       Fiber.yield();
317 
318       if (eventReason == TAsyncEventReason.TIMED_OUT) {
319         // If we are cancelling the request due to a timed out operation, the
320         // connection is in an undefined state, because the server could decide
321         // to send the requested data later, or we could have already been half-
322         // way into writing a request. Thus, we close the connection to make any
323         // possibly queued up work items fail immediately. Besides, the server
324         // is not very likely to immediately recover after a socket-level
325         // timeout has expired anyway.
326         closeImmediately();
327 
328         throw new TTransportException("Timed out while waiting for socket " ~
329           "to get ready to " ~ to!string(eventType) ~ ".",
330           TTransportException.Type.TIMED_OUT);
331       }
332     }
333   }
334 
335   /// The TAsyncSocketManager to use for non-blocking I/O.
336   TAsyncSocketManager asyncManager_;
337 }
338 
339 private {
340   // std.socket doesn't include SO_ERROR for reasons unknown.
341   version (linux) {
342     enum SO_ERROR = 4;
343   } else version (OSX) {
344     enum SO_ERROR = 0x1007;
345   } else version (FreeBSD) {
346     enum SO_ERROR = 0x1007;
347   } else version (Win32) {
348     import std.c.windows.winsock : SO_ERROR;
349   } else static assert(false, "Don't know SO_ERROR on this platform.");
350 
351   // This hack forces a delegate literal to be scoped, even if it is passed to
352   // a function accepting normal delegates as well. DMD likes to allocate the
353   // context on the heap anyway, but it seems to work for LDC.
354   import std.traits : isDelegate;
355   auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
356     return d;
357   }
358 }