1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * Utilities for asynchronously querying multiple servers, building on
22  * TAsyncClient.
23  *
24  * Terminology note: The names of the artifacts defined in this module are
25  *   derived from »client pool«, because they operate on a pool of
26  *   TAsyncClients. However, from a architectural point of view, they often
27  *   represent a pool of hosts a Thrift client application communicates with
28  *   using RPC calls.
29  */
30 module thrift.codegen.async_client_pool;
31 
32 import core.sync.mutex;
33 import core.time : Duration, dur;
34 import std.algorithm : map;
35 import std.array : array, empty;
36 import std.exception : enforce;
37 import std.traits : ParameterTypeTuple, ReturnType;
38 import thrift.base;
39 import thrift.codegen.base;
40 import thrift.codegen.async_client;
41 import thrift.internal.algorithm;
42 import thrift.internal.codegen;
43 import thrift.util.awaitable;
44 import thrift.util.cancellation;
45 import thrift.util.future;
46 import thrift.internal.resource_pool;
47 
48 /**
49  * Represents a generic client pool which implements TFutureInterface!Interface
50  * using multiple TAsyncClients.
51  */
52 interface TAsyncClientPoolBase(Interface) if (isService!Interface) :
53   TFutureInterface!Interface
54 {
55   /// Shorthand for the client type this pool operates on.
56   alias TAsyncClientBase!Interface Client;
57 
58   /**
59    * Adds a client to the pool.
60    */
61   void addClient(Client client);
62 
63   /**
64    * Removes a client from the pool.
65    *
66    * Returns: Whether the client was found in the pool.
67    */
68   bool removeClient(Client client);
69 
70   /**
71    * Called to determine whether an exception comes from a client from the
72    * pool not working properly, or if it an exception thrown at the
73    * application level.
74    *
75    * If the delegate returns true, the server/connection is considered to be
76    * at fault, if it returns false, the exception is just passed on to the
77    * caller.
78    *
79    * By default, returns true for instances of TTransportException and
80    * TApplicationException, false otherwise.
81    */
82   bool delegate(Exception) rpcFaultFilter() const @property;
83   void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto
84 
85   /**
86    * Whether to open the underlying transports of a client before trying to
87    * execute a method if they are not open. This is usually desirable
88    * because it allows e.g. to automatically reconnect to a remote server
89    * if the network connection is dropped.
90    *
91    * Defaults to true.
92    */
93   bool reopenTransports() const @property;
94   void reopenTransports(bool) @property; /// Ditto
95 }
96 
97 immutable bool delegate(Exception) defaultRpcFaultFilter;
98 shared static this() {
99   defaultRpcFaultFilter = (Exception e) {
100     import thrift.protocol.base;
101     import thrift.transport.base;
102     return (
103       (cast(TTransportException)e !is null) ||
104       (cast(TApplicationException)e !is null)
105     );
106   };
107 }
108 
109 /**
110  * A TAsyncClientPoolBase implementation which queries multiple servers in a
111  * row until a request succeeds, the result of which is then returned.
112  *
113  * The definition of »success« can be customized using the rpcFaultFilter()
114  * delegate property. If it is non-null and calling it for an exception set by
115  * a failed method invocation returns true, the error is considered to be
116  * caused by the RPC layer rather than the application layer, and the next
117  * server in the pool is tried. If there are no more clients to try, the
118  * operation is marked as failed with a TCompoundOperationException.
119  *
120  * If a TAsyncClient in the pool fails with an RPC exception for a number of
121  * consecutive tries, it is temporarily disabled (not tried any longer) for
122  * a certain duration. Both the limit and the timeout can be configured. If all
123  * clients fail (and keepTrying is false), the operation fails with a
124  * TCompoundOperationException which contains the collected RPC exceptions.
125  */
126 final class TAsyncClientPool(Interface) if (isService!Interface) :
127   TAsyncClientPoolBase!Interface
128 {
129   ///
130   this(Client[] clients) {
131     pool_ = new TResourcePool!Client(clients);
132     rpcFaultFilter_ = defaultRpcFaultFilter;
133     reopenTransports_ = true;
134   }
135 
136   /+override+/ void addClient(Client client) {
137     pool_.add(client);
138   }
139 
140   /+override+/ bool removeClient(Client client) {
141     return pool_.remove(client);
142   }
143 
144   /**
145    * Whether to keep trying to find a working client if all have failed in a
146    * row.
147    *
148    * Defaults to false.
149    */
150   bool keepTrying() const @property {
151     return pool_.cycle;
152   }
153 
154   /// Ditto
155   void keepTrying(bool value) @property {
156     pool_.cycle = value;
157   }
158 
159   /**
160    * Whether to use a random permutation of the client pool on every call to
161    * execute(). This can be used e.g. as a simple form of load balancing.
162    *
163    * Defaults to true.
164    */
165   bool permuteClients() const @property {
166     return pool_.permute;
167   }
168 
169   /// Ditto
170   void permuteClients(bool value) @property {
171     pool_.permute = value;
172   }
173 
174   /**
175    * The number of consecutive faults after which a client is disabled until
176    * faultDisableDuration has passed. 0 to never disable clients.
177    *
178    * Defaults to 0.
179    */
180   ushort faultDisableCount() const @property {
181     return pool_.faultDisableCount;
182   }
183 
184   /// Ditto
185   void faultDisableCount(ushort value) @property {
186     pool_.faultDisableCount = value;
187   }
188 
189   /**
190    * The duration for which a client is no longer considered after it has
191    * failed too often.
192    *
193    * Defaults to one second.
194    */
195   Duration faultDisableDuration() const @property {
196     return pool_.faultDisableDuration;
197   }
198 
199   /// Ditto
200   void faultDisableDuration(Duration value) @property {
201     pool_.faultDisableDuration = value;
202   }
203 
204   /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
205     return rpcFaultFilter_;
206   }
207 
208   /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
209     rpcFaultFilter_ = value;
210   }
211 
212   /+override+/ bool reopenTransports() const @property {
213     return reopenTransports_;
214   }
215 
216   /+override+/ void reopenTransports(bool value) @property {
217     reopenTransports_ = value;
218   }
219 
220   mixin(fallbackPoolForwardCode!Interface());
221 
222 protected:
223   // The actual worker implementation to which RPC method calls are forwarded.
224   auto executeOnPool(string method, Args...)(Args args,
225     TCancellation cancellation
226   ) {
227     auto clients = pool_[];
228     if (clients.empty) {
229       throw new TException("No clients available to try.");
230     }
231 
232     auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method)));
233     Exception[] rpcExceptions;
234 
235     void tryNext() {
236       while (clients.empty) {
237         Client next;
238         Duration waitTime;
239         if (clients.willBecomeNonempty(next, waitTime)) {
240           if (waitTime > dur!"hnsecs"(0)) {
241             if (waitTime < dur!"usecs"(10)) {
242               import core.thread;
243               Thread.sleep(waitTime);
244             } else {
245               next.transport.asyncManager.delay(waitTime, { tryNext(); });
246               return;
247             }
248           }
249         } else {
250           promise.fail(new TCompoundOperationException("All clients failed.",
251             rpcExceptions));
252           return;
253         }
254       }
255 
256       auto client = clients.front;
257       clients.popFront;
258 
259       if (reopenTransports) {
260         if (!client.transport.isOpen) {
261           try {
262             client.transport.open();
263           } catch (Exception e) {
264             pool_.recordFault(client);
265             tryNext();
266             return;
267           }
268         }
269       }
270 
271       auto future = mixin("client." ~ method)(args, cancellation);
272       future.completion.addCallback({
273         if (future.status == TFutureStatus.CANCELLED) {
274           promise.cancel();
275           return;
276         }
277 
278         auto e = future.getException();
279         if (e) {
280           if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
281             pool_.recordFault(client);
282             rpcExceptions ~= e;
283             tryNext();
284             return;
285           }
286         }
287         pool_.recordSuccess(client);
288         promise.complete(future);
289       });
290     }
291 
292     tryNext();
293     return promise;
294   }
295 
296 private:
297   TResourcePool!Client pool_;
298   bool delegate(Exception) rpcFaultFilter_;
299   bool reopenTransports_;
300 }
301 
302 /**
303  * TAsyncClientPool construction helper to avoid having to explicitly
304  * specify the interface type, i.e. to allow the constructor being called
305  * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
306  */
307 TAsyncClientPool!Interface tAsyncClientPool(Interface)(
308   TAsyncClientBase!Interface[] clients
309 ) if (isService!Interface) {
310   return new typeof(return)(clients);
311 }
312 
313 private {
314   // Cannot use an anonymous delegate literal for this because they aren't
315   // allowed in class scope.
316   string fallbackPoolForwardCode(Interface)() {
317     string code = "";
318 
319     foreach (methodName; AllMemberMethodNames!Interface) {
320       enum qn = "Interface." ~ methodName;
321       code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
322         "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n";
323       code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n";
324       code ~= "}\n";
325     }
326 
327     return code;
328   }
329 }
330 
331 /**
332  * A TAsyncClientPoolBase implementation which queries multiple servers at
333  * the same time and returns the first success response.
334  *
335  * The definition of »success« can be customized using the rpcFaultFilter()
336  * delegate property. If it is non-null and calling it for an exception set by
337  * a failed method invocation returns true, the error is considered to be
338  * caused by the RPC layer rather than the application layer, and the next
339  * server in the pool is tried. If all clients fail, the operation is marked
340  * as failed with a TCompoundOperationException.
341  */
342 final class TAsyncFastestClientPool(Interface) if (isService!Interface) :
343   TAsyncClientPoolBase!Interface
344 {
345   ///
346   this(Client[] clients) {
347     clients_ = clients;
348     rpcFaultFilter_ = defaultRpcFaultFilter;
349     reopenTransports_ = true;
350   }
351 
352   /+override+/ void addClient(Client client) {
353     clients_ ~= client;
354   }
355 
356   /+override+/ bool removeClient(Client client) {
357     auto oldLength = clients_.length;
358     clients_ = removeEqual(clients_, client);
359     return clients_.length < oldLength;
360   }
361 
362 
363   /+override+/ bool delegate(Exception) rpcFaultFilter() const @property {
364     return rpcFaultFilter_;
365   }
366 
367   /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property {
368     rpcFaultFilter_ = value;
369   }
370 
371   /+override+/bool reopenTransports() const @property {
372     return reopenTransports_;
373   }
374 
375   /+override+/ void reopenTransports(bool value) @property {
376     reopenTransports_ = value;
377   }
378 
379   mixin(fastestPoolForwardCode!Interface());
380 
381 private:
382   Client[] clients_;
383   bool delegate(Exception) rpcFaultFilter_;
384   bool reopenTransports_;
385 }
386 
387 /**
388  * TAsyncFastestClientPool construction helper to avoid having to explicitly
389  * specify the interface type, i.e. to allow the constructor being called
390  * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
391  */
392 TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)(
393   TAsyncClientBase!Interface[] clients
394 ) if (isService!Interface) {
395   return new typeof(return)(clients);
396 }
397 
398 private {
399   // Cannot use an anonymous delegate literal for this because they aren't
400   // allowed in class scope.
401   string fastestPoolForwardCode(Interface)() {
402     string code = "";
403 
404     foreach (methodName; AllMemberMethodNames!Interface) {
405       enum qn = "Interface." ~ methodName;
406       code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~
407         "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~
408         "TCancellation cancellation = null) {\n";
409       code ~= "enum methodName = `" ~ methodName ~ "`;\n";
410       code ~= q{
411         alias ReturnType!(MemberType!(Interface, methodName)) ResultType;
412 
413         auto childCancellation = new TCancellationOrigin;
414 
415         TFuture!ResultType[] futures;
416         futures.reserve(clients_.length);
417 
418         foreach (c; clients_) {
419           if (reopenTransports) {
420             if (!c.transport.isOpen) {
421               try {
422                 c.transport.open();
423               } catch (Exception e) {
424                 continue;
425               }
426             }
427           }
428           futures ~= mixin("c." ~ methodName)(args, childCancellation);
429         }
430 
431         return new FastestPoolJob!(ResultType)(
432           futures, rpcFaultFilter, cancellation, childCancellation);
433       };
434       code ~= "}\n";
435     }
436 
437     return code;
438   }
439 
440   final class FastestPoolJob(Result) : TFuture!Result {
441     this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter,
442       TCancellation cancellation, TCancellationOrigin childCancellation
443     ) {
444       resultPromise_ = new TPromise!Result;
445       poolFutures_ = poolFutures;
446       rpcFaultFilter_ = rpcFaultFilter;
447       childCancellation_ = childCancellation;
448 
449       foreach (future; poolFutures) {
450         future.completion.addCallback({
451           auto f = future;
452           return { completionCallback(f); };
453         }());
454         if (future.status != TFutureStatus.RUNNING) {
455           // If the current future is already completed, we are done, don't
456           // bother adding callbacks for the others (they would just return
457           // immediately after acquiring the lock).
458           return;
459         }
460       }
461 
462       if (cancellation) {
463         cancellation.triggering.addCallback({
464           resultPromise_.cancel();
465           childCancellation.trigger();
466         });
467       }
468     }
469 
470     TFutureStatus status() const @property {
471       return resultPromise_.status;
472     }
473 
474     TAwaitable completion() @property {
475       return resultPromise_.completion;
476     }
477 
478     Result get() {
479       return resultPromise_.get();
480     }
481 
482     Exception getException() {
483       return resultPromise_.getException();
484     }
485 
486   private:
487     void completionCallback(TFuture!Result future) {
488       synchronized {
489         if (future.status == TFutureStatus.CANCELLED) {
490           assert(resultPromise_.status != TFutureStatus.RUNNING);
491           return;
492         }
493 
494         if (resultPromise_.status != TFutureStatus.RUNNING) {
495           // The operation has already been completed. This can happen if
496           // another client completed first, but this callback was already
497           // waiting for the lock when it called cancel().
498           return;
499         }
500 
501         if (future.status == TFutureStatus.FAILED) {
502           auto e = future.getException();
503           if (rpcFaultFilter_ && rpcFaultFilter_(e)) {
504             rpcExceptions_ ~= e;
505 
506             if (rpcExceptions_.length == poolFutures_.length) {
507               resultPromise_.fail(new TCompoundOperationException(
508                 "All child operations failed, unable to retrieve a result.",
509                 rpcExceptions_
510               ));
511             }
512 
513             return;
514           }
515         }
516 
517         // Store the result to the target promise.
518         resultPromise_.complete(future);
519 
520         // Cancel the other futures, we would just discard their results.
521         // Note: We do this after we have stored the results to our promise,
522         // see the assert at the top of the function.
523         childCancellation_.trigger();
524       }
525     }
526 
527     TPromise!Result resultPromise_;
528     TFuture!Result[] poolFutures_;
529     Exception[] rpcExceptions_;
530     bool delegate(Exception) rpcFaultFilter_;
531     TCancellationOrigin childCancellation_;
532   }
533 }
534 
535 /**
536  * Allows easily aggregating results from a number of TAsyncClients.
537  *
538  * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not
539  * simply implement TFutureInterface!Interface. It manages a pool of clients,
540  * but allows the user to specify a custom accumulator function to use or to
541  * iterate over the results using a TFutureAggregatorRange.
542  *
543  * For each service method, TAsyncAggregator offers a method
544  * accepting the same arguments, and an optional TCancellation instance, just
545  * like with TFutureInterface. The return type, however, is a proxy object
546  * that offers the following methods:
547  * ---
548  * /++
549  *  + Returns a thrift.util.future.TFutureAggregatorRange for the results of
550  *  + the client pool method invocations.
551  *  +
552  *  + The [] (slicing) operator can also be used to obtain the range.
553  *  +
554  *  + Params:
555  *  +   timeout = A timeout to pass to the TFutureAggregatorRange constructor,
556  *  +     defaults to zero (no timeout).
557  *  +/
558  * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
559  * auto opSlice() { return range(); } /// Ditto
560  *
561  * /++
562  *  + Returns a future that gathers the results from the clients in the pool
563  *  + and invokes a user-supplied accumulator function on them, returning its
564  *  + return value to the client.
565  *  +
566  *  + In addition to the TFuture!AccumulatedType interface (where
567  *  + AccumulatedType is the return type of the accumulator function), the
568  *  + returned object also offers two additional methods, finish() and
569  *  + finishGet(): By default, the accumulator functions is called after all
570  *  + the results from the pool clients have become available. Calling finish()
571  *  + causes the accumulator future to stop waiting for other results and
572  *  + immediately invoking the accumulator function on the results currently
573  *  + available. If all results are already available, finish() is a no-op.
574  *  + finishGet() is a convenience shortcut for combining it with
575  *  + a call to get() immediately afterwards, like waitGet() is for wait().
576  *  +
577  *  + The acc alias can point to any callable accepting either an array of
578  *  + return values or an array of return values and an array of exceptions;
579  *  + see isAccumulator!() for details. The default accumulator concatenates
580  *  + return values that can be concatenated with each others (e.g. arrays),
581  *  + and simply returns an array of values otherwise, failing with a
582  *  + TCompoundOperationException no values were returned.
583  *  +
584  *  + The accumulator function is not executed in any of the async manager
585  *  + worker threads associated with the async clients, but instead it is
586  *  + invoked when the actual result is requested for the first time after the
587  *  + operation has been completed. This also includes checking the status
588  *  + of the operation once it is no longer running, since the accumulator
589  *  + has to be run to determine whether the operation succeeded or failed.
590  *  +/
591  * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);
592  * ---
593  *
594  * Example:
595  * ---
596  * // Some Thrift service.
597  * interface Foo {
598  *   int foo(string name);
599  *   byte[] bar();
600  * }
601  *
602  * // Create the aggregator pool – client0, client1, client2 are some
603  * // TAsyncClient!Foo instances, but in theory could also be other
604  * // TFutureInterface!Foo implementations (e.g. some async client pool).
605  * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);
606  *
607  * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
608  *   // Process all the results that are available before a second has passed,
609  *   // in the order they arrive.
610  *   writeln(val);
611  * }
612  *
613  * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
614  *   if (vals.empty) {
615  *     throw new TCompoundOperationException("All clients failed", exs);
616  *   }
617  *
618  *   // Just to illustrate that the type of the values can change, convert the
619  *   // numbers to double and sum up their roots.
620  *   double result = 0;
621  *   foreach (v; vals) result += sqrt(cast(double)v);
622  *   return result;
623  * })();
624  *
625  * // Wait up to three seconds for the result, and then accumulate what has
626  * // arrived so far.
627  * sumRoots.completion.wait(dur!"seconds"(3));
628  * writeln(sumRoots.finishGet());
629  *
630  * // For scalars, the default accumulator returns an array of the values.
631  * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].
632  *
633  * // For lists, etc., it concatenates the results together.
634  * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].
635  * ---
636  *
637  * Note: For the accumulate!() interface, you might currently hit a »cannot use
638  * local '…' as parameter to non-global template accumulate«-error, see
639  * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need
640  * to access the surrounding scope, you might want to use a function literal
641  * instead of a delegate to avoid the issue.
642  */
643 class TAsyncAggregator(Interface) if (isBaseService!Interface) {
644   /// Shorthand for the client type this instance operates on.
645   alias TAsyncClientBase!Interface Client;
646 
647   ///
648   this(Client[] clients) {
649     clients_ = clients;
650   }
651 
652   /// Whether to open the underlying transports of a client before trying to
653   /// execute a method if they are not open. This is usually desirable
654   /// because it allows e.g. to automatically reconnect to a remote server
655   /// if the network connection is dropped.
656   ///
657   /// Defaults to true.
658   bool reopenTransports = true;
659 
660   mixin AggregatorOpDispatch!();
661 
662 private:
663   Client[] clients_;
664 }
665 
666 /// Ditto
667 class TAsyncAggregator(Interface) if (isDerivedService!Interface) :
668   TAsyncAggregator!(BaseService!Interface)
669 {
670   /// Shorthand for the client type this instance operates on.
671   alias TAsyncClientBase!Interface Client;
672 
673   ///
674   this(Client[] clients) {
675     super(cast(TAsyncClientBase!(BaseService!Interface)[])clients);
676   }
677 
678   mixin AggregatorOpDispatch!();
679 }
680 
681 /**
682  * Whether fun is a valid accumulator function for values of type ValueType.
683  *
684  * For this to be true, fun must be a callable matching one of the following
685  * argument lists:
686  * ---
687  * fun(ValueType[] values);
688  * fun(ValueType[] values, Exception[] exceptions);
689  * ---
690  *
691  * The second version is passed the collected array exceptions from all the
692  * clients in the pool.
693  *
694  * The return value of the accumulator function is passed to the client (via
695  * the result future). If it throws an exception, the operation is marked as
696  * failed with the given exception instead.
697  */
698 template isAccumulator(ValueType, alias fun) {
699   enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) ||
700     is(typeof(fun(cast(ValueType[])[], cast(Exception[])[])));
701 }
702 
703 /**
704  * TAsyncAggregator construction helper to avoid having to explicitly
705  * specify the interface type, i.e. to allow the constructor being called
706  * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
707  */
708 TAsyncAggregator!Interface tAsyncAggregator(Interface)(
709   TAsyncClientBase!Interface[] clients
710 ) if (isService!Interface) {
711   return new typeof(return)(clients);
712 }
713 
714 private {
715   mixin template AggregatorOpDispatch() {
716     auto opDispatch(string name, Args...)(Args args) if (
717       is(typeof(mixin("Interface.init." ~ name)(args)))
718     ) {
719       alias ReturnType!(MemberType!(Interface, name)) ResultType;
720 
721       auto childCancellation = new TCancellationOrigin;
722 
723       TFuture!ResultType[] futures;
724       futures.reserve(clients_.length);
725 
726       foreach (c; cast(Client[])clients_) {
727         if (reopenTransports) {
728           if (!c.transport.isOpen) {
729             try {
730               c.transport.open();
731             } catch (Exception e) {
732               continue;
733             }
734           }
735         }
736         futures ~= mixin("c." ~ name)(args, childCancellation);
737       }
738 
739       return AggregationResult!ResultType(futures, childCancellation);
740     }
741   }
742 
743   struct AggregationResult(T) {
744     auto opSlice() {
745       return range();
746     }
747 
748     auto range(Duration timeout = dur!"hnsecs"(0)) {
749       return tFutureAggregatorRange(futures_, childCancellation_, timeout);
750     }
751 
752     auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) {
753       return new AccumulatorJob!(T, acc)(futures_, childCancellation_);
754     }
755 
756   private:
757     TFuture!T[] futures_;
758     TCancellationOrigin childCancellation_;
759   }
760 
761   auto defaultAccumulator(T)(T[] values, Exception[] exceptions) {
762     if (values.empty) {
763       throw new TCompoundOperationException("All clients failed",
764         exceptions);
765     }
766 
767     static if (is(typeof(T.init ~ T.init))) {
768       import std.algorithm;
769       return reduce!"a ~ b"(values);
770     } else {
771       return values;
772     }
773   }
774 
775   final class AccumulatorJob(T, alias accumulator) if (
776     isAccumulator!(T, accumulator)
777   ) : TFuture!(AccumulatorResult!(T, accumulator)) {
778     this(TFuture!T[] futures, TCancellationOrigin childCancellation) {
779       futures_ = futures;
780       childCancellation_ = childCancellation;
781       resultMutex_ = new Mutex;
782       completionEvent_ = new TOneshotEvent;
783 
784       foreach (future; futures) {
785         future.completion.addCallback({
786           auto f = future;
787           return {
788             synchronized (resultMutex_) {
789               if (f.status == TFutureStatus.CANCELLED) {
790                 if (!finished_) {
791                   status_ = TFutureStatus.CANCELLED;
792                   finished_ = true;
793                 }
794                 return;
795               }
796 
797               if (f.status == TFutureStatus.FAILED) {
798                 exceptions_ ~= f.getException();
799               } else {
800                 results_ ~= f.get();
801               }
802 
803               if (results_.length + exceptions_.length == futures_.length) {
804                 finished_ = true;
805                 completionEvent_.trigger();
806               }
807             }
808           };
809         }());
810       }
811     }
812 
813     TFutureStatus status() @property {
814       synchronized (resultMutex_) {
815         if (!finished_) return TFutureStatus.RUNNING;
816         if (status_ != TFutureStatus.RUNNING) return status_;
817 
818         try {
819           result_ = invokeAccumulator!accumulator(results_, exceptions_);
820           status_ = TFutureStatus.SUCCEEDED;
821         } catch (Exception e) {
822           exception_ = e;
823           status_ = TFutureStatus.FAILED;
824         }
825 
826         return status_;
827       }
828     }
829 
830     TAwaitable completion() @property {
831       return completionEvent_;
832     }
833 
834     AccumulatorResult!(T, accumulator) get() {
835       auto s = status;
836 
837       enforce(s != TFutureStatus.RUNNING,
838         new TFutureException("Operation not yet completed."));
839 
840       if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
841       if (s == TFutureStatus.FAILED) throw exception_;
842       return result_;
843     }
844 
845     Exception getException() {
846       auto s = status;
847       enforce(s != TFutureStatus.RUNNING,
848         new TFutureException("Operation not yet completed."));
849 
850       if (s == TFutureStatus.CANCELLED) throw new TCancelledException;
851 
852       if (s == TFutureStatus.SUCCEEDED) {
853         return null;
854       }
855       return exception_;
856     }
857 
858     void finish() {
859       synchronized (resultMutex_) {
860         if (!finished_) {
861           finished_ = true;
862           childCancellation_.trigger();
863           completionEvent_.trigger();
864         }
865       }
866     }
867 
868     auto finishGet() {
869       finish();
870       return get();
871     }
872 
873   private:
874     TFuture!T[] futures_;
875     TCancellationOrigin childCancellation_;
876 
877     bool finished_;
878     T[] results_;
879     Exception[] exceptions_;
880 
881     TFutureStatus status_;
882     Mutex resultMutex_;
883     union {
884       AccumulatorResult!(T, accumulator) result_;
885       Exception exception_;
886     }
887     TOneshotEvent completionEvent_;
888   }
889 
890   auto invokeAccumulator(alias accumulator, T)(
891     T[] values, Exception[] exceptions
892   ) if (
893     isAccumulator!(T, accumulator)
894   ) {
895     static if (is(typeof(accumulator(values, exceptions)))) {
896       return accumulator(values, exceptions);
897     } else {
898       return accumulator(values);
899     }
900   }
901 
902   template AccumulatorResult(T, alias acc) {
903     alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[]))
904       AccumulatorResult;
905   }
906 }