TAsyncAggregator

Allows easily aggregating results from a number of TAsyncClients.

Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not simply implement TFutureInterface!Interface. It manages a pool of clients, but allows the user to specify a custom accumulator function to use or to iterate over the results using a TFutureAggregatorRange.

For each service method, TAsyncAggregator offers a method accepting the same arguments, and an optional TCancellation instance, just like with TFutureInterface. The return type, however, is a proxy object that offers the following methods:

/++
 + Returns a thrift.util.future.TFutureAggregatorRange for the results of
 + the client pool method invocations.
 +
 + The [] (slicing) operator can also be used to obtain the range.
 +
 + Params:
 +   timeout = A timeout to pass to the TFutureAggregatorRange constructor,
 +     defaults to zero (no timeout).
 +/
TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0));
auto opSlice() { return range(); } /// Ditto

/++
 + Returns a future that gathers the results from the clients in the pool
 + and invokes a user-supplied accumulator function on them, returning its
 + return value to the client.
 +
 + In addition to the TFuture!AccumulatedType interface (where
 + AccumulatedType is the return type of the accumulator function), the
 + returned object also offers two additional methods, finish() and
 + finishGet(): By default, the accumulator functions is called after all
 + the results from the pool clients have become available. Calling finish()
 + causes the accumulator future to stop waiting for other results and
 + immediately invoking the accumulator function on the results currently
 + available. If all results are already available, finish() is a no-op.
 + finishGet() is a convenience shortcut for combining it with
 + a call to get() immediately afterwards, like waitGet() is for wait().
 +
 + The acc alias can point to any callable accepting either an array of
 + return values or an array of return values and an array of exceptions;
 + see isAccumulator!() for details. The default accumulator concatenates
 + return values that can be concatenated with each others (e.g. arrays),
 + and simply returns an array of values otherwise, failing with a
 + TCompoundOperationException no values were returned.
 +
 + The accumulator function is not executed in any of the async manager
 + worker threads associated with the async clients, but instead it is
 + invoked when the actual result is requested for the first time after the
 + operation has been completed. This also includes checking the status
 + of the operation once it is no longer running, since the accumulator
 + has to be run to determine whether the operation succeeded or failed.
 +/
auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc);

Constructors

this
this(Client[] clients)

Members

Aliases

Client
alias Client = TAsyncClientBase!Interface

Shorthand for the client type this instance operates on.

Mixins

__anonymous
mixin AggregatorOpDispatch!()
Undocumented in source.

Variables

reopenTransports
bool reopenTransports;

Whether to open the underlying transports of a client before trying to execute a method if they are not open. This is usually desirable because it allows e.g. to automatically reconnect to a remote server if the network connection is dropped.

Examples

// Some Thrift service.
interface Foo {
  int foo(string name);
  byte[] bar();
}

// Create the aggregator pool – client0, client1, client2 are some
// TAsyncClient!Foo instances, but in theory could also be other
// TFutureInterface!Foo implementations (e.g. some async client pool).
auto pool = new TAsyncAggregator!Foo([client0, client1, client2]);

foreach (val; pool.foo("baz").range(dur!"seconds"(1))) {
  // Process all the results that are available before a second has passed,
  // in the order they arrive.
  writeln(val);
}

auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){
  if (vals.empty) {
    throw new TCompoundOperationException("All clients failed", exs);
  }

  // Just to illustrate that the type of the values can change, convert the
  // numbers to double and sum up their roots.
  double result = 0;
  foreach (v; vals) result += sqrt(cast(double)v);
  return result;
})();

// Wait up to three seconds for the result, and then accumulate what has
// arrived so far.
sumRoots.completion.wait(dur!"seconds"(3));
writeln(sumRoots.finishGet());

// For scalars, the default accumulator returns an array of the values.
pragma(msg, typeof(pool.foo("").accumulate().get()); // int[].

// For lists, etc., it concatenates the results together.
pragma(msg, typeof(pool.bar().accumulate().get())); // byte[].

Note: For the accumulate!() interface, you might currently hit a »cannot use local '…' as parameter to non-global template accumulate«-error, see $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need to access the surrounding scope, you might want to use a function literal instead of a delegate to avoid the issue.

Meta