1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.async.socket;
20 
21 import core.thread : Fiber;
22 import core.time : dur, Duration;
23 import std.array : empty;
24 import std.conv : to;
25 import std.exception : enforce;
26 import std.socket;
27 import thrift.base;
28 import thrift.async.base;
29 import thrift.transport.base;
30 import thrift.transport.socket : TSocketBase;
31 import thrift.internal.endian;
32 import thrift.internal.socket;
33 
34 version (Windows) {
35   import core.sys.windows.winsock2 : connect;
36 } else version (Posix) {
37   import core.sys.posix.sys.socket : connect;
38 } else static assert(0, "Don't know connect on this platform.");
39 
40 version (Win32) {
41   import core.stdc.config : __c_long;
42 }
43 
44 /**
45  * Non-blocking socket implementation of the TTransport interface.
46  *
47  * Whenever a socket operation would block, TAsyncSocket registers a callback
48  * with the specified TAsyncSocketManager and yields.
49  *
50  * As for thrift.transport.socket, due to the limitations of std.socket,
51  * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
52  * not).
53  */
54 class TAsyncSocket : TSocketBase, TAsyncTransport {
55   /**
56    * Constructor that takes an already created, connected (!) socket.
57    *
58    * Params:
59    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
60    *   socket = Already created, connected socket object. Will be switched to
61    *     non-blocking mode if it isn't already.
62    */
63   this(TAsyncSocketManager asyncManager, Socket socket) {
64     asyncManager_ = asyncManager;
65     socket.blocking = false;
66     super(socket);
67   }
68 
69   /**
70    * Creates a new unconnected socket that will connect to the given host
71    * on the given port.
72    *
73    * Params:
74    *   asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
75    *   host = Remote host.
76    *   port = Remote port.
77    */
78   this(TAsyncSocketManager asyncManager, string host, ushort port) {
79     asyncManager_ = asyncManager;
80     super(host, port);
81   }
82 
83   override TAsyncManager asyncManager() @property {
84     return asyncManager_;
85   }
86 
87   /**
88    * Asynchronously connects the socket.
89    *
90    * Completes without blocking and defers further operations on the socket
91    * until the connection is established. If connecting fails, this is
92    * currently not indicated in any way other than every call to read/write
93    * failing.
94    */
95   override void open() {
96     if (isOpen) return;
97 
98     enforce(!host_.empty, new TTransportException(
99       "Cannot open null host.", TTransportException.Type.NOT_OPEN));
100     enforce(port_ != 0, new TTransportException(
101       "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
102 
103 
104     // Cannot use std.socket.Socket.connect here because it hides away
105     // EINPROGRESS/WSAWOULDBLOCK.
106     Address addr;
107     try {
108       // Currently, we just go with the first address returned, could be made
109       // more intelligent though – IPv6?
110       addr = getAddress(host_, port_)[0];
111     } catch (Exception e) {
112       throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
113         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
114     }
115 
116     socket_ = new TcpSocket(addr.addressFamily);
117     socket_.blocking = false;
118     setSocketOpts();
119 
120     auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
121     if (errorCode == 0) {
122       // If the connection could be established immediately, just return. I
123       // don't know if this ever happens.
124       return;
125     }
126 
127     auto errno = getSocketErrno();
128     if (errno != CONNECT_INPROGRESS_ERRNO) {
129       throw new TTransportException(`Could not establish connection to "` ~
130         host_ ~ `": ` ~ socketErrnoString(errno),
131         TTransportException.Type.NOT_OPEN);
132     }
133 
134     // This is the expected case: connect() signalled that the connection
135     // is being established in the background. Queue up a work item with the
136     // async manager which just defers any other operations on this
137     // TAsyncSocket instance until the socket is ready.
138     asyncManager_.execute(this,
139       {
140         auto fiber = Fiber.getThis();
141         TAsyncEventReason reason = void;
142         asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
143           connectTimeout,
144           scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
145         );
146         Fiber.yield();
147 
148         if (reason == TAsyncEventReason.TIMED_OUT) {
149           // Close the connection, so that subsequent work items fail immediately.
150           closeImmediately();
151           return;
152         }
153 
154         version (Win32) {
155           __c_long errorCode = void;
156         } else {
157           int errorCode = void;
158         }
159         socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
160           errorCode);
161 
162         if (errorCode) {
163           logInfo("Could not connect TAsyncSocket: %s",
164             socketErrnoString(errorCode));
165 
166           // Close the connection, so that subsequent work items fail immediately.
167           closeImmediately();
168           return;
169         }
170 
171       }
172     );
173   }
174 
175   /**
176    * Closes the socket.
177    *
178    * Will block until all currently active operations are finished before the
179    * socket is closed.
180    */
181   override void close() {
182     if (!isOpen) return;
183 
184     import core.sync.condition;
185     import core.sync.mutex;
186 
187     auto doneMutex = new Mutex;
188     auto doneCond = new Condition(doneMutex);
189     synchronized (doneMutex) {
190       asyncManager_.execute(this,
191         scopedDelegate(
192           {
193             closeImmediately();
194             synchronized (doneMutex) doneCond.notifyAll();
195           }
196         )
197       );
198       doneCond.wait();
199     }
200   }
201 
202   override bool peek() {
203     if (!isOpen) return false;
204 
205     ubyte buf;
206     auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
207     if (r == Socket.ERROR) {
208       auto lastErrno = getSocketErrno();
209       static if (connresetOnPeerShutdown) {
210         if (lastErrno == ECONNRESET) {
211           closeImmediately();
212           return false;
213         }
214       }
215       throw new TTransportException("Peeking into socket failed: " ~
216         socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
217     }
218     return (r > 0);
219   }
220 
221   override size_t read(ubyte[] buf) {
222     enforce(isOpen, new TTransportException(
223       "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
224 
225     typeof(getSocketErrno()) lastErrno;
226 
227     auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
228       TAsyncEventType.READ);
229 
230     // If recv went fine, immediately return.
231     if (r >= 0) return r;
232 
233     // Something went wrong, find out how to handle it.
234     lastErrno = getSocketErrno();
235 
236     static if (connresetOnPeerShutdown) {
237       // See top comment.
238       if (lastErrno == ECONNRESET) {
239         return 0;
240       }
241     }
242 
243     throw new TTransportException("Receiving from socket failed: " ~
244       socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
245   }
246 
247   override void write(in ubyte[] buf) {
248     size_t sent;
249     while (sent < buf.length) {
250       sent += writeSome(buf[sent .. $]);
251     }
252     assert(sent == buf.length);
253   }
254 
255   override size_t writeSome(in ubyte[] buf) {
256     enforce(isOpen, new TTransportException(
257       "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
258 
259     auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
260 
261     // Everything went well, just return the number of bytes written.
262     if (r > 0) return r;
263 
264     // Handle error conditions.
265     if (r < 0) {
266       auto lastErrno = getSocketErrno();
267 
268       auto type = TTransportException.Type.UNKNOWN;
269       if (isSocketCloseErrno(lastErrno)) {
270         type = TTransportException.Type.NOT_OPEN;
271         closeImmediately();
272       }
273 
274       throw new TTransportException("Sending to socket failed: " ~
275         socketErrnoString(lastErrno), type);
276     }
277 
278     // send() should never return 0.
279     throw new TTransportException("Sending to socket failed (0 bytes written).",
280       TTransportException.Type.UNKNOWN);
281   }
282 
283   /// The amount of time in which a conncetion must be established before the
284   /// open() call times out.
285   Duration connectTimeout = dur!"seconds"(5);
286 
287 private:
288   void closeImmediately() {
289     socket_.close();
290     socket_ = null;
291   }
292 
293   T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
294     while (true) {
295       auto result = call();
296       if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
297 
298       // We got an EAGAIN result, register a callback to return here once some
299       // event happens and yield.
300 
301       Duration timeout = void;
302       final switch (eventType) {
303         case TAsyncEventType.READ:
304           timeout = recvTimeout_;
305           break;
306         case TAsyncEventType.WRITE:
307           timeout = sendTimeout_;
308           break;
309       }
310 
311       auto fiber = Fiber.getThis();
312       assert(fiber, "Current fiber null – not running in TAsyncManager?");
313       TAsyncEventReason eventReason = void;
314       asyncManager_.addOneshotListener(socket_, eventType, timeout,
315         scopedDelegate((TAsyncEventReason reason) {
316           eventReason = reason;
317           fiber.call();
318         })
319       );
320 
321       // Yields execution back to the async manager, will return back here once
322       // the above listener is called.
323       Fiber.yield();
324 
325       if (eventReason == TAsyncEventReason.TIMED_OUT) {
326         // If we are cancelling the request due to a timed out operation, the
327         // connection is in an undefined state, because the server could decide
328         // to send the requested data later, or we could have already been half-
329         // way into writing a request. Thus, we close the connection to make any
330         // possibly queued up work items fail immediately. Besides, the server
331         // is not very likely to immediately recover after a socket-level
332         // timeout has expired anyway.
333         closeImmediately();
334 
335         throw new TTransportException("Timed out while waiting for socket " ~
336           "to get ready to " ~ to!string(eventType) ~ ".",
337           TTransportException.Type.TIMED_OUT);
338       }
339     }
340   }
341 
342   /// The TAsyncSocketManager to use for non-blocking I/O.
343   TAsyncSocketManager asyncManager_;
344 }
345 
346 private {
347   // std.socket doesn't include SO_ERROR for reasons unknown.
348   version (linux) {
349     enum SO_ERROR = 4;
350   } else version (OSX) {
351     enum SO_ERROR = 0x1007;
352   } else version (FreeBSD) {
353     enum SO_ERROR = 0x1007;
354   } else version (Windows) {
355     import core.sys.windows.winsock2 : SO_ERROR;
356   } else static assert(false, "Don't know SO_ERROR on this platform.");
357 
358   // This hack forces a delegate literal to be scoped, even if it is passed to
359   // a function accepting normal delegates as well. DMD likes to allocate the
360   // context on the heap anyway, but it seems to work for LDC.
361   import std.traits : isDelegate;
362   auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
363     return d;
364   }
365 }