1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.util.future;
20 
21 import core.atomic;
22 import core.sync.condition;
23 import core.sync.mutex;
24 import core.time : Duration;
25 import std.array : empty, front, popFront;
26 import std.conv : to;
27 import std.exception : enforce;
28 import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType;
29 import thrift.base;
30 import thrift.util.awaitable;
31 import thrift.util.cancellation;
32 
33 /**
34  * Represents an operation which is executed asynchronously and the result of
35  * which will become available at some point in the future.
36  *
37  * Once a operation is completed, the result of the operation can be fetched
38  * via the get() family of methods. There are three possible cases: Either the
39  * operation succeeded, then its return value is returned, or it failed by
40  * throwing, in which case the exception is rethrown, or it was cancelled
41  * before, then a TCancelledException is thrown. There might be TFuture
42  * implementations which never possibly enter the cancelled state.
43  *
44  * All methods are thread-safe, but keep in mind that any exception object or
45  * result (if it is a reference type, of course) is shared between all
46  * get()-family invocations.
47  */
48 interface TFuture(ResultType) {
49   /**
50    * The status the operation is currently in.
51    *
52    * An operation starts out in RUNNING status, and changes state to one of the
53    * others at most once afterwards.
54    */
55   TFutureStatus status() @property;
56 
57   /**
58    * A TAwaitable triggered when the operation leaves the RUNNING status.
59    */
60   TAwaitable completion() @property;
61 
62   /**
63    * Convenience shorthand for waiting until the result is available and then
64    * get()ing it.
65    *
66    * If the operation has already completed, the result is immediately
67    * returned.
68    *
69    * The result of this method is »alias this«'d to the interface, so that
70    * TFuture can be used as a drop-in replacement for a simple value in
71    * synchronous code.
72    */
73   final ResultType waitGet() {
74     completion.wait();
75     return get();
76   }
77   final @property auto waitGetProperty() { return waitGet(); }
78   alias waitGetProperty this;
79 
80   /**
81    * Convenience shorthand for waiting until the result is available and then
82    * get()ing it.
83    *
84    * If the operation completes in time, returns its result (resp. throws an
85    * exception for the failed/cancelled cases). If not, throws a
86    * TFutureException.
87    */
88   final ResultType waitGet(Duration timeout) {
89     enforce(completion.wait(timeout), new TFutureException(
90       "Operation did not complete in time."));
91     return get();
92   }
93 
94   /**
95    * Returns the result of the operation.
96    *
97    * Throws: TFutureException if the operation has been cancelled,
98    *   TCancelledException if it is not yet done; the set exception if it
99    *   failed.
100    */
101   ResultType get();
102 
103   /**
104    * Returns the captured exception if the operation failed, or null otherwise.
105    *
106    * Throws: TFutureException if not yet done, TCancelledException if the
107    *   operation has been cancelled.
108    */
109   Exception getException();
110 }
111 
112 /**
113  * The states the operation offering a future interface can be in.
114  */
115 enum TFutureStatus : byte {
116   RUNNING, /// The operation is still running.
117   SUCCEEDED, /// The operation completed without throwing an exception.
118   FAILED, /// The operation completed by throwing an exception.
119   CANCELLED /// The operation was cancelled.
120 }
121 
122 /**
123  * A TFuture covering the simple but common case where the result is simply
124  * set by a call to succeed()/fail().
125  *
126  * All methods are thread-safe, but usually, succeed()/fail() are only called
127  * from a single thread (different from the thread(s) waiting for the result
128  * using the TFuture interface, though).
129  */
130 class TPromise(ResultType) : TFuture!ResultType {
131   this() {
132     statusMutex_ = new Mutex;
133     completionEvent_ = new TOneshotEvent;
134   }
135 
136   override S status() const @property {
137     return atomicLoad(status_);
138   }
139 
140   override TAwaitable completion() @property {
141     return completionEvent_;
142   }
143 
144   override ResultType get() {
145     auto s = atomicLoad(status_);
146     enforce(s != S.RUNNING,
147       new TFutureException("Operation not yet completed."));
148 
149     if (s == S.CANCELLED) throw new TCancelledException;
150     if (s == S.FAILED) throw exception_;
151 
152     static if (!is(ResultType == void)) {
153       return result_;
154     }
155   }
156 
157   override Exception getException() {
158     auto s = atomicLoad(status_);
159     enforce(s != S.RUNNING,
160       new TFutureException("Operation not yet completed."));
161 
162     if (s == S.CANCELLED) throw new TCancelledException;
163     if (s == S.SUCCEEDED) return null;
164 
165     return exception_;
166   }
167 
168   static if (!is(ResultType == void)) {
169     /**
170      * Sets the result of the operation, marks it as done, and notifies any
171      * waiters.
172      *
173      * If the operation has been cancelled before, nothing happens.
174      *
175      * Throws: TFutureException if the operation is already completed.
176      */
177     void succeed(ResultType result) {
178       synchronized (statusMutex_) {
179         auto s = atomicLoad(status_);
180         if (s == S.CANCELLED) return;
181 
182         enforce(s == S.RUNNING,
183           new TFutureException("Operation already completed."));
184         result_ = result;
185 
186         atomicStore(status_, S.SUCCEEDED);
187       }
188 
189       completionEvent_.trigger();
190     }
191   } else {
192     void succeed() {
193       synchronized (statusMutex_) {
194         auto s = atomicLoad(status_);
195         if (s == S.CANCELLED) return;
196 
197         enforce(s == S.RUNNING,
198           new TFutureException("Operation already completed."));
199 
200         atomicStore(status_, S.SUCCEEDED);
201       }
202 
203       completionEvent_.trigger();
204     }
205   }
206 
207   /**
208    * Marks the operation as failed with the specified exception and notifies
209    * any waiters.
210    *
211    * If the operation was already cancelled, nothing happens.
212    *
213    * Throws: TFutureException if the operation is already completed.
214    */
215   void fail(Exception exception) {
216     synchronized (statusMutex_) {
217       auto status = atomicLoad(status_);
218       if (status == S.CANCELLED) return;
219 
220       enforce(status == S.RUNNING,
221         new TFutureException("Operation already completed."));
222       exception_ = exception;
223 
224       atomicStore(status_, S.FAILED);
225     }
226 
227     completionEvent_.trigger();
228   }
229 
230 
231   /**
232    * Marks this operation as completed and takes over the outcome of another
233    * TFuture of the same type.
234    *
235    * If this operation was already cancelled, nothing happens. If the other
236    * operation was cancelled, this operation is marked as failed with a
237    * TCancelledException.
238    *
239    * Throws: TFutureException if the passed in future was not completed or
240    *   this operation is already completed.
241    */
242   void complete(TFuture!ResultType future) {
243     synchronized (statusMutex_) {
244       auto status = atomicLoad(status_);
245       if (status == S.CANCELLED) return;
246       enforce(status == S.RUNNING,
247         new TFutureException("Operation already completed."));
248 
249       enforce(future.status != S.RUNNING, new TFutureException(
250         "The passed TFuture is not yet completed."));
251 
252       status = future.status;
253       if (status == S.CANCELLED) {
254         status = S.FAILED;
255         exception_ = new TCancelledException;
256       } else if (status == S.FAILED) {
257         exception_ = future.getException();
258       } else static if (!is(ResultType == void)) {
259         result_ = future.get();
260       }
261 
262       atomicStore(status_, status);
263     }
264 
265     completionEvent_.trigger();
266   }
267 
268   /**
269    * Marks this operation as cancelled and notifies any waiters.
270    *
271    * If the operation is already completed, nothing happens.
272    */
273   void cancel() {
274     synchronized (statusMutex_) {
275       auto status = atomicLoad(status_);
276       if (status == S.RUNNING) atomicStore(status_, S.CANCELLED);
277     }
278 
279     completionEvent_.trigger();
280   }
281 
282 private:
283   // Convenience alias because TFutureStatus is ubiquitous in this class.
284   alias TFutureStatus S;
285 
286   // The status the promise is currently in.
287   shared S status_;
288 
289   union {
290     static if (!is(ResultType == void)) {
291       // Set if status_ is SUCCEEDED.
292       ResultType result_;
293     }
294     // Set if status_ is FAILED.
295     Exception exception_;
296   }
297 
298   // Protects status_.
299   // As for result_ and exception_: They are only set once, while status_ is
300   // still RUNNING, so given that the operation has already completed, reading
301   // them is safe without holding some kind of lock.
302   Mutex statusMutex_;
303 
304   // Triggered when the event completes.
305   TOneshotEvent completionEvent_;
306 }
307 
308 ///
309 class TFutureException : TException {
310   ///
311   this(string msg = "", string file = __FILE__, size_t line = __LINE__,
312     Throwable next = null)
313   {
314     super(msg, file, line, next);
315   }
316 }
317 
318 /**
319  * Creates an interface that is similar to a given one, but accepts an
320  * additional, optional TCancellation parameter each method, and returns
321  * TFutures instead of plain return values.
322  *
323  * For example, given the following declarations:
324  * ---
325  * interface Foo {
326  *   void bar();
327  *   string baz(int a);
328  * }
329  * alias TFutureInterface!Foo FutureFoo;
330  * ---
331  *
332  * FutureFoo would be equivalent to:
333  * ---
334  * interface FutureFoo {
335  *   TFuture!void bar(TCancellation cancellation = null);
336  *   TFuture!string baz(int a, TCancellation cancellation = null);
337  * }
338  * ---
339  */
340 template TFutureInterface(Interface) if (is(Interface _ == interface)) {
341   mixin({
342     string code = "interface TFutureInterface \n";
343 
344     static if (is(Interface Bases == super) && Bases.length > 0) {
345       code ~= ": ";
346       foreach (i; 0 .. Bases.length) {
347         if (i > 0) code ~= ", ";
348         code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) ";
349       }
350     }
351 
352     code ~= "{\n";
353 
354     foreach (methodName; __traits(derivedMembers, Interface)) {
355       enum qn = "Interface." ~ methodName;
356       static if (isSomeFunction!(mixin(qn))) {
357         code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
358           "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n";
359       }
360     }
361 
362     code ~= "}\n";
363     return code;
364   }());
365 }
366 
367 /**
368  * An input range that aggregates results from multiple asynchronous operations,
369  * returning them in the order they arrive.
370  *
371  * Additionally, a timeout can be set after which results from not yet finished
372  * futures will no longer be waited for, e.g. to ensure the time it takes to
373  * iterate over a set of results is limited.
374  */
375 final class TFutureAggregatorRange(T) {
376   /**
377    * Constructs a new instance.
378    *
379    * Params:
380    *   futures = The set of futures to collect results from.
381    *   timeout = If positive, not yet finished futures will be cancelled and
382    *     their results will not be taken into account.
383    */
384   this(TFuture!T[] futures, TCancellationOrigin childCancellation,
385     Duration timeout = dur!"hnsecs"(0)
386   ) {
387     if (timeout > dur!"hnsecs"(0)) {
388       timeoutSysTick_ = TickDuration.currSystemTick +
389         TickDuration.from!"hnsecs"(timeout.total!"hnsecs");
390     } else {
391       timeoutSysTick_ = TickDuration(0);
392     }
393 
394     queueMutex_ = new Mutex;
395     queueNonEmptyCondition_ = new Condition(queueMutex_);
396     futures_ = futures;
397     childCancellation_ = childCancellation;
398 
399     foreach (future; futures_) {
400       future.completion.addCallback({
401         auto f = future;
402         return {
403           if (f.status == TFutureStatus.CANCELLED) return;
404           assert(f.status != TFutureStatus.RUNNING);
405 
406           synchronized (queueMutex_) {
407             completedQueue_ ~= f;
408 
409             if (completedQueue_.length == 1) {
410               queueNonEmptyCondition_.notifyAll();
411             }
412           }
413         };
414       }());
415     }
416   }
417 
418   /**
419    * Whether the range is empty.
420    *
421    * This is the case if the results from the completed futures not having
422    * failed have already been popped and either all future have been finished
423    * or the timeout has expired.
424    *
425    * Potentially blocks until a new result is available or the timeout has
426    * expired.
427    */
428   bool empty() @property {
429     if (finished_) return true;
430     if (bufferFilled_) return false;
431 
432     while (true) {
433       TFuture!T future;
434       synchronized (queueMutex_) {
435         // The while loop is just being cautious about spurious wakeups, in
436         // case they should be possible.
437         while (completedQueue_.empty) {
438           auto remaining = to!Duration(timeoutSysTick_ -
439             TickDuration.currSystemTick);
440 
441           if (remaining <= dur!"hnsecs"(0)) {
442             // No time left, but still no element received – we are empty now.
443             finished_ = true;
444             childCancellation_.trigger();
445             return true;
446           }
447 
448           queueNonEmptyCondition_.wait(remaining);
449         }
450 
451         future = completedQueue_.front;
452         completedQueue_.popFront();
453       }
454 
455       ++completedCount_;
456       if (completedCount_ == futures_.length) {
457         // This was the last future in the list, there is no possibility
458         // another result could ever become available.
459         finished_ = true;
460       }
461 
462       if (future.status == TFutureStatus.FAILED) {
463         // This one failed, loop again and try getting another item from
464         // the queue.
465         exceptions_ ~= future.getException();
466       } else {
467         resultBuffer_ = future.get();
468         bufferFilled_ = true;
469         return false;
470       }
471     }
472   }
473 
474   /**
475    * Returns the first element from the range.
476    *
477    * Potentially blocks until a new result is available or the timeout has
478    * expired.
479    *
480    * Throws: TException if the range is empty.
481    */
482   T front() {
483     enforce(!empty, new TException(
484       "Cannot get front of an empty future aggregator range."));
485     return resultBuffer_;
486   }
487 
488   /**
489    * Removes the first element from the range.
490    *
491    * Potentially blocks until a new result is available or the timeout has
492    * expired.
493    *
494    * Throws: TException if the range is empty.
495    */
496   void popFront() {
497     enforce(!empty, new TException(
498       "Cannot pop front of an empty future aggregator range."));
499     bufferFilled_ = false;
500   }
501 
502   /**
503    * The number of futures the result of which has been returned or which have
504    * failed so far.
505    */
506   size_t completedCount() @property const {
507     return completedCount_;
508   }
509 
510   /**
511    * The exceptions collected from failed TFutures so far.
512    */
513   Exception[] exceptions() @property {
514     return exceptions_;
515   }
516 
517 private:
518   TFuture!T[] futures_;
519   TCancellationOrigin childCancellation_;
520 
521   // The system tick this operation will time out, or zero if no timeout has
522   // been set.
523   TickDuration timeoutSysTick_;
524 
525   bool finished_;
526 
527   bool bufferFilled_;
528   T resultBuffer_;
529 
530   Exception[] exceptions_;
531   size_t completedCount_;
532 
533   // The queue of completed futures. This (and the associated condition) are
534   // the only parts of this class that are accessed by multiple threads.
535   TFuture!T[] completedQueue_;
536   Mutex queueMutex_;
537   Condition queueNonEmptyCondition_;
538 }
539 
540 /**
541  * TFutureAggregatorRange construction helper to avoid having to explicitly
542  * specify the value type, i.e. to allow the constructor being called using IFTI
543  * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)).
544  */
545 TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures,
546   TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0)
547 ) {
548   return new TFutureAggregatorRange!T(futures, childCancellation, timeout);
549 }