TAsyncAggregator

Allows easily aggregating results from a number of TAsyncClients.

Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not simply implement TFutureInterface!Interface. It manages a pool of clients, but allows the user to specify a custom accumulator function to use or to iterate over the results using a TFutureAggregatorRange.

For each service method, TAsyncAggregator offers a method accepting the same arguments, and an optional TCancellation instance, just like with TFutureInterface. The return type, however, is a proxy object that offers the following methods:

/++
 + Returns a thrift.util.future.TFutureAggregatorRange for the results of
 + the client pool method invocations.
 +
 + The [] (slicing) operator can also be used to obtain the range.
 +
 + Params:
 +   timeout = A timeout to pass to the TFutureAggregatorRange constructor,
 +     defaults to zero (no timeout).
 +/
TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
auto opSlice() { return range(); } /// Ditto

/++
 + Returns a future that gathers the results from the clients in the pool
 + and invokes a user-supplied accumulator function on them, returning its
 + return value to the client.
 +
 + In addition to the TFuture!AccumulatedType interface (where
 + AccumulatedType is the return type of the accumulator function), the
 + returned object also offers two additional methods, finish() and
 + finishGet(): By default, the accumulator functions is called after all
 + the results from the pool clients have become available. Calling finish()
 + causes the accumulator future to stop waiting for other results and
 + immediately invoking the accumulator function on the results currently
 + available. If all results are already available, finish() is a no-op.
 + finishGet() is a convenience shortcut for combining it with
 + a call to get() immediately afterwards, like waitGet() is for wait().
 +
 + The acc alias can point to any callable accepting either an array of
 + return values or an array of return values and an array of exceptions;
 + see isAccumulator!() for details. The default accumulator concatenates
 + return values that can be concatenated with each others (e.g. arrays),
 + and simply returns an array of values otherwise, failing with a
 + TCompoundOperationException no values were returned.
 +
 + The accumulator function is not executed in any of the async manager
 + worker threads associated with the async clients, but instead it is
 + invoked when the actual result is requested for the first time after the
 + operation has been completed. This also includes checking the status
 + of the operation once it is no longer running, since the accumulator
 + has to be run to determine whether the operation succeeded or failed.
 +/
auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);

Constructors

this
this(Client[] clients)

Members

Aliases

Client
alias Client = TAsyncClientBase!Interface

Shorthand for the client type this instance operates on.

Mixins

__anonymous
mixin AggregatorOpDispatch!()
Undocumented in source.

Variables

reopenTransports
bool reopenTransports;

Whether to open the underlying transports of a client before trying to execute a method if they are not open. This is usually desirable because it allows e.g. to automatically reconnect to a remote server if the network connection is dropped.

Mixed In Members

From mixin AggregatorOpDispatch!()

opDispatch
auto opDispatch(Args args)
Undocumented in source. Be warned that the author may not have intended to support it.

Examples

// Some Thrift service.
interface Foo {
  int foo(string name);
  byte[] bar();
}

// Create the aggregator pool – client0, client1, client2 are some
// TAsyncClient!Foo instances, but in theory could also be other
// TFutureInterface!Foo implementations (e.g. some async client pool).
auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);

foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
  // Process all the results that are available before a second has passed,
  // in the order they arrive.
  writeln(val);
}

auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
  if (vals.empty) {
    throw new TCompoundOperationException("All clients failed", exs);
  }

  // Just to illustrate that the type of the values can change, convert the
  // numbers to double and sum up their roots.
  double result = 0;
  foreach (v; vals) result += sqrt(cast(double)v);
  return result;
})();

// Wait up to three seconds for the result, and then accumulate what has
// arrived so far.
sumRoots.completion.wait(dur!"seconds"(3));
writeln(sumRoots.finishGet());

// For scalars, the default accumulator returns an array of the values.
pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].

// For lists, etc., it concatenates the results together.
pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].

Note: For the accumulate!() interface, you might currently hit a »cannot use local '…' as parameter to non-global template accumulate«-error, see $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need to access the surrounding scope, you might want to use a function literal instead of a delegate to avoid the issue.

Meta