1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * Defines the interface used for client-side handling of asynchronous
22  * I/O operations, based on coroutines.
23  *
24  * The main piece of the »client side« (e.g. for TAsyncClient users) of the
25  * API is TFuture, which represents an asynchronously executed operation,
26  * which can have a return value, throw exceptions, and which can be waited
27  * upon.
28  *
29  * On the »implementation side«, the idea is that by using a TAsyncTransport
30  * instead of a normal TTransport and executing the work through a
31  * TAsyncManager, the same code as for synchronous I/O can be used for
32  * asynchronous operation as well, for example:
33  *
34  * ---
35  * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port);
36  * // …
37  * socket.asyncManager.execute(socket, {
38  *   SomeThriftStruct s;
39  *
40  *   // Waiting for socket I/O will not block an entire thread but cause
41  *   // the async manager to execute another task in the meantime, because
42  *   // we are using TAsyncSocket instead of TSocket.
43  *   s.read(socket);
44  *
45  *   // Do something with s, e.g. set a TPromise result to it.
46  *   writeln(s);
47  * });
48  * ---
49  */
50 module thrift.async.base;
51 
52 import core.time : Duration, dur;
53 import std.socket/+ : Socket+/; // DMD @@BUG314@@
54 import thrift.base;
55 import thrift.transport.base;
56 import thrift.util.cancellation;
57 
58 /**
59  * Manages one or more asynchronous transport resources (e.g. sockets in the
60  * case of TAsyncSocketManager) and allows work items to be submitted for them.
61  *
62  * Implementations will typically run one or more background threads for
63  * executing the work, which is one of the reasons for a TAsyncManager to be
64  * used. Each work item is run in its own fiber and is expected to yield() away
65  * while waiting for time-consuming operations.
66  *
67  * The second important purpose of TAsyncManager is to serialize access to
68  * the transport resources – without taking care of that, e.g. issuing multiple
69  * RPC calls over the same connection in rapid succession would likely lead to
70  * more than one request being written at the same time, causing only garbage
71  * to arrive at the remote end.
72  *
73  * All methods are thread-safe.
74  */
75 interface TAsyncManager {
76   /**
77    * Submits a work item to be executed asynchronously.
78    *
79    * Access to asnyc transports is serialized – if two work items associated
80    * with the same transport are submitted, the second delegate will not be
81    * invoked until the first has returned, even it the latter context-switches
82    * away (because it is waiting for I/O) and the async manager is idle
83    * otherwise.
84    *
85    * Optionally, a TCancellation instance can be specified. If present,
86    * triggering it will be considered a request to cancel the work item, if it
87    * is still waiting for the associated transport to become available.
88    * Delegates which are already being processed (i.e. waiting for I/O) are not
89    * affected because this would bring the connection into an undefined state
90    * (as probably half-written request or a half-read response would be left
91    * behind).
92    *
93    * Params:
94    *   transport = The TAsyncTransport the work delegate will operate on. Must
95    *     be associated with this TAsyncManager instance.
96    *   work = The operations to execute on the given transport. Must never
97    *     throw, errors should be handled in another way. nothrow semantics are
98    *     difficult to enforce in combination with fibres though, so currently
99    *     exceptions are just swallowed by TAsyncManager implementations.
100    *   cancellation = If set, can be used to request cancellatinon of this work
101    *     item if it is still waiting to be executed.
102    *
103    * Note: The work item will likely be executed in a different thread, so make
104    *   sure the code it relies on is thread-safe. An exception are the async
105    *   transports themselves, to which access is serialized as noted above.
106    */
107   void execute(TAsyncTransport transport, void delegate() work,
108     TCancellation cancellation = null
109   ) in {
110     assert(transport.asyncManager is this,
111       "The given transport must be associated with this TAsyncManager.");
112   }
113 
114   /**
115    * Submits a delegate to be executed after a certain amount of time has
116    * passed.
117    *
118    * The actual amount of time elapsed can be higher if the async manager
119    * instance is busy and thus should not be relied on. The
120    *
121    * Params:
122    *   duration = The amount of time to wait before starting to execute the
123    *     work delegate.
124    *   work = The code to execute after the specified amount of time has passed.
125    *
126    * Example:
127    * ---
128    * // A very basic example – usually, the actuall work item would enqueue
129    * // some async transport operation.
130    * auto asyncMangager = someAsyncManager();
131    *
132    * TFuture!int calculate() {
133    *   // Create a promise and asynchronously set its value after three
134    *   // seconds have passed.
135    *   auto promise = new TPromise!int;
136    *   asyncManager.delay(dur!"seconds"(3), {
137    *     promise.succeed(42);
138    *   });
139    *
140    *   // Immediately return it to the caller.
141    *   return promise;
142    * }
143    *
144    * // This will wait until the result is available and then print it.
145    * writeln(calculate().waitGet());
146    * ---
147    */
148   void delay(Duration duration, void delegate() work);
149 
150   /**
151    * Shuts down all background threads or other facilities that might have
152    * been started in order to execute work items. This function is typically
153    * called during program shutdown.
154    *
155    * If there are still tasks to be executed when the timeout expires, any
156    * currently executed work items will never receive any notifications
157    * for async transports managed by this instance, queued work items will
158    * be silently dropped, and implementations are allowed to leak resources.
159    *
160    * Params:
161    *   waitFinishTimeout = If positive, waits for all work items to be
162    *     finished for the specified amount of time, if negative, waits for
163    *     completion without ever timing out, if zero, immediately shuts down
164    *     the background facilities.
165    */
166   bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1));
167 }
168 
169 /**
170  * A TTransport which uses a TAsyncManager to schedule non-blocking operations.
171  *
172  * The actual type of device is not specified; typically, implementations will
173  * depend on an interface derived from TAsyncManager to be notified of changes
174  * in the transport state.
175  *
176  * The peeking, reading, writing and flushing methods must always be called
177  * from within the associated async manager.
178  */
179 interface TAsyncTransport : TTransport {
180   /**
181    * The TAsyncManager associated with this transport.
182    */
183   TAsyncManager asyncManager() @property;
184 }
185 
186 /**
187  * A TAsyncManager providing notificiations for socket events.
188  */
189 interface TAsyncSocketManager : TAsyncManager {
190   /**
191    * Adds a listener that is triggered once when an event of the specified type
192    * occurs, and removed afterwards.
193    *
194    * Params:
195    *   socket = The socket to listen for events at.
196    *   eventType = The type of the event to listen for.
197    *   timeout = The period of time after which the listener will be called
198    *     with TAsyncEventReason.TIMED_OUT if no event happened.
199    *   listener = The delegate to call when an event happened.
200    */
201   void addOneshotListener(Socket socket, TAsyncEventType eventType,
202     Duration timeout, TSocketEventListener listener);
203 
204   /// Ditto
205   void addOneshotListener(Socket socket, TAsyncEventType eventType,
206     TSocketEventListener listener);
207 }
208 
209 /**
210  * Types of events that can happen for an asynchronous transport.
211  */
212 enum TAsyncEventType {
213   READ, /// New data became available to read.
214   WRITE /// The transport became ready to be written to.
215 }
216 
217 /**
218  * The type of the delegates used to register socket event handlers.
219  */
220 alias void delegate(TAsyncEventReason callReason) TSocketEventListener;
221 
222 /**
223  * The reason a listener was called.
224  */
225 enum TAsyncEventReason : byte {
226   NORMAL, /// The event listened for was triggered normally.
227   TIMED_OUT /// A timeout for the event was set, and it expired.
228 }