1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * Utilities for asynchronously querying multiple servers, building on 22 * TAsyncClient. 23 * 24 * Terminology note: The names of the artifacts defined in this module are 25 * derived from »client pool«, because they operate on a pool of 26 * TAsyncClients. However, from a architectural point of view, they often 27 * represent a pool of hosts a Thrift client application communicates with 28 * using RPC calls. 29 */ 30 module thrift.codegen.async_client_pool; 31 32 import core.sync.mutex; 33 import core.time : Duration, dur; 34 import std.algorithm : map; 35 import std.array : array, empty; 36 import std.exception : enforce; 37 import std.traits : ParameterTypeTuple, ReturnType; 38 import thrift.base; 39 import thrift.codegen.base; 40 import thrift.codegen.async_client; 41 import thrift.internal.algorithm; 42 import thrift.internal.codegen; 43 import thrift.util.awaitable; 44 import thrift.util.cancellation; 45 import thrift.util.future; 46 import thrift.internal.resource_pool; 47 48 /** 49 * Represents a generic client pool which implements TFutureInterface!Interface 50 * using multiple TAsyncClients. 51 */ 52 interface TAsyncClientPoolBase(Interface) if (isService!Interface) : 53 TFutureInterface!Interface 54 { 55 /// Shorthand for the client type this pool operates on. 56 alias TAsyncClientBase!Interface Client; 57 58 /** 59 * Adds a client to the pool. 60 */ 61 void addClient(Client client); 62 63 /** 64 * Removes a client from the pool. 65 * 66 * Returns: Whether the client was found in the pool. 67 */ 68 bool removeClient(Client client); 69 70 /** 71 * Called to determine whether an exception comes from a client from the 72 * pool not working properly, or if it an exception thrown at the 73 * application level. 74 * 75 * If the delegate returns true, the server/connection is considered to be 76 * at fault, if it returns false, the exception is just passed on to the 77 * caller. 78 * 79 * By default, returns true for instances of TTransportException and 80 * TApplicationException, false otherwise. 81 */ 82 bool delegate(Exception) rpcFaultFilter() const @property; 83 void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto 84 85 /** 86 * Whether to open the underlying transports of a client before trying to 87 * execute a method if they are not open. This is usually desirable 88 * because it allows e.g. to automatically reconnect to a remote server 89 * if the network connection is dropped. 90 * 91 * Defaults to true. 92 */ 93 bool reopenTransports() const @property; 94 void reopenTransports(bool) @property; /// Ditto 95 } 96 97 immutable bool delegate(Exception) defaultRpcFaultFilter; 98 shared static this() { 99 defaultRpcFaultFilter = (Exception e) { 100 import thrift.protocol.base; 101 import thrift.transport.base; 102 return ( 103 (cast(TTransportException)e !is null) || 104 (cast(TApplicationException)e !is null) 105 ); 106 }; 107 } 108 109 /** 110 * A TAsyncClientPoolBase implementation which queries multiple servers in a 111 * row until a request succeeds, the result of which is then returned. 112 * 113 * The definition of »success« can be customized using the rpcFaultFilter() 114 * delegate property. If it is non-null and calling it for an exception set by 115 * a failed method invocation returns true, the error is considered to be 116 * caused by the RPC layer rather than the application layer, and the next 117 * server in the pool is tried. If there are no more clients to try, the 118 * operation is marked as failed with a TCompoundOperationException. 119 * 120 * If a TAsyncClient in the pool fails with an RPC exception for a number of 121 * consecutive tries, it is temporarily disabled (not tried any longer) for 122 * a certain duration. Both the limit and the timeout can be configured. If all 123 * clients fail (and keepTrying is false), the operation fails with a 124 * TCompoundOperationException which contains the collected RPC exceptions. 125 */ 126 final class TAsyncClientPool(Interface) if (isService!Interface) : 127 TAsyncClientPoolBase!Interface 128 { 129 /// 130 this(Client[] clients) { 131 pool_ = new TResourcePool!Client(clients); 132 rpcFaultFilter_ = defaultRpcFaultFilter; 133 reopenTransports_ = true; 134 } 135 136 /+override+/ void addClient(Client client) { 137 pool_.add(client); 138 } 139 140 /+override+/ bool removeClient(Client client) { 141 return pool_.remove(client); 142 } 143 144 /** 145 * Whether to keep trying to find a working client if all have failed in a 146 * row. 147 * 148 * Defaults to false. 149 */ 150 bool keepTrying() const @property { 151 return pool_.cycle; 152 } 153 154 /// Ditto 155 void keepTrying(bool value) @property { 156 pool_.cycle = value; 157 } 158 159 /** 160 * Whether to use a random permutation of the client pool on every call to 161 * execute(). This can be used e.g. as a simple form of load balancing. 162 * 163 * Defaults to true. 164 */ 165 bool permuteClients() const @property { 166 return pool_.permute; 167 } 168 169 /// Ditto 170 void permuteClients(bool value) @property { 171 pool_.permute = value; 172 } 173 174 /** 175 * The number of consecutive faults after which a client is disabled until 176 * faultDisableDuration has passed. 0 to never disable clients. 177 * 178 * Defaults to 0. 179 */ 180 ushort faultDisableCount() const @property { 181 return pool_.faultDisableCount; 182 } 183 184 /// Ditto 185 void faultDisableCount(ushort value) @property { 186 pool_.faultDisableCount = value; 187 } 188 189 /** 190 * The duration for which a client is no longer considered after it has 191 * failed too often. 192 * 193 * Defaults to one second. 194 */ 195 Duration faultDisableDuration() const @property { 196 return pool_.faultDisableDuration; 197 } 198 199 /// Ditto 200 void faultDisableDuration(Duration value) @property { 201 pool_.faultDisableDuration = value; 202 } 203 204 /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { 205 return rpcFaultFilter_; 206 } 207 208 /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { 209 rpcFaultFilter_ = value; 210 } 211 212 /+override+/ bool reopenTransports() const @property { 213 return reopenTransports_; 214 } 215 216 /+override+/ void reopenTransports(bool value) @property { 217 reopenTransports_ = value; 218 } 219 220 mixin(fallbackPoolForwardCode!Interface()); 221 222 protected: 223 // The actual worker implementation to which RPC method calls are forwarded. 224 auto executeOnPool(string method, Args...)(Args args, 225 TCancellation cancellation 226 ) { 227 auto clients = pool_[]; 228 if (clients.empty) { 229 throw new TException("No clients available to try."); 230 } 231 232 auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method))); 233 Exception[] rpcExceptions; 234 235 void tryNext() { 236 while (clients.empty) { 237 Client next; 238 Duration waitTime; 239 if (clients.willBecomeNonempty(next, waitTime)) { 240 if (waitTime > dur!"hnsecs"(0)) { 241 if (waitTime < dur!"usecs"(10)) { 242 import core.thread; 243 Thread.sleep(waitTime); 244 } else { 245 next.transport.asyncManager.delay(waitTime, { tryNext(); }); 246 return; 247 } 248 } 249 } else { 250 promise.fail(new TCompoundOperationException("All clients failed.", 251 rpcExceptions)); 252 return; 253 } 254 } 255 256 auto client = clients.front; 257 clients.popFront; 258 259 if (reopenTransports) { 260 if (!client.transport.isOpen) { 261 try { 262 client.transport.open(); 263 } catch (Exception e) { 264 pool_.recordFault(client); 265 tryNext(); 266 return; 267 } 268 } 269 } 270 271 auto future = mixin("client." ~ method)(args, cancellation); 272 future.completion.addCallback({ 273 if (future.status == TFutureStatus.CANCELLED) { 274 promise.cancel(); 275 return; 276 } 277 278 auto e = future.getException(); 279 if (e) { 280 if (rpcFaultFilter_ && rpcFaultFilter_(e)) { 281 pool_.recordFault(client); 282 rpcExceptions ~= e; 283 tryNext(); 284 return; 285 } 286 } 287 pool_.recordSuccess(client); 288 promise.complete(future); 289 }); 290 } 291 292 tryNext(); 293 return promise; 294 } 295 296 private: 297 TResourcePool!Client pool_; 298 bool delegate(Exception) rpcFaultFilter_; 299 bool reopenTransports_; 300 } 301 302 /** 303 * TAsyncClientPool construction helper to avoid having to explicitly 304 * specify the interface type, i.e. to allow the constructor being called 305 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). 306 */ 307 TAsyncClientPool!Interface tAsyncClientPool(Interface)( 308 TAsyncClientBase!Interface[] clients 309 ) if (isService!Interface) { 310 return new typeof(return)(clients); 311 } 312 313 private { 314 // Cannot use an anonymous delegate literal for this because they aren't 315 // allowed in class scope. 316 string fallbackPoolForwardCode(Interface)() { 317 string code = ""; 318 319 foreach (methodName; AllMemberMethodNames!Interface) { 320 enum qn = "Interface." ~ methodName; 321 code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ 322 "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n"; 323 code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n"; 324 code ~= "}\n"; 325 } 326 327 return code; 328 } 329 } 330 331 /** 332 * A TAsyncClientPoolBase implementation which queries multiple servers at 333 * the same time and returns the first success response. 334 * 335 * The definition of »success« can be customized using the rpcFaultFilter() 336 * delegate property. If it is non-null and calling it for an exception set by 337 * a failed method invocation returns true, the error is considered to be 338 * caused by the RPC layer rather than the application layer, and the next 339 * server in the pool is tried. If all clients fail, the operation is marked 340 * as failed with a TCompoundOperationException. 341 */ 342 final class TAsyncFastestClientPool(Interface) if (isService!Interface) : 343 TAsyncClientPoolBase!Interface 344 { 345 /// 346 this(Client[] clients) { 347 clients_ = clients; 348 rpcFaultFilter_ = defaultRpcFaultFilter; 349 reopenTransports_ = true; 350 } 351 352 /+override+/ void addClient(Client client) { 353 clients_ ~= client; 354 } 355 356 /+override+/ bool removeClient(Client client) { 357 auto oldLength = clients_.length; 358 clients_ = removeEqual(clients_, client); 359 return clients_.length < oldLength; 360 } 361 362 363 /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { 364 return rpcFaultFilter_; 365 } 366 367 /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { 368 rpcFaultFilter_ = value; 369 } 370 371 /+override+/bool reopenTransports() const @property { 372 return reopenTransports_; 373 } 374 375 /+override+/ void reopenTransports(bool value) @property { 376 reopenTransports_ = value; 377 } 378 379 mixin(fastestPoolForwardCode!Interface()); 380 381 private: 382 Client[] clients_; 383 bool delegate(Exception) rpcFaultFilter_; 384 bool reopenTransports_; 385 } 386 387 /** 388 * TAsyncFastestClientPool construction helper to avoid having to explicitly 389 * specify the interface type, i.e. to allow the constructor being called 390 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). 391 */ 392 TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)( 393 TAsyncClientBase!Interface[] clients 394 ) if (isService!Interface) { 395 return new typeof(return)(clients); 396 } 397 398 private { 399 // Cannot use an anonymous delegate literal for this because they aren't 400 // allowed in class scope. 401 string fastestPoolForwardCode(Interface)() { 402 string code = ""; 403 404 foreach (methodName; AllMemberMethodNames!Interface) { 405 enum qn = "Interface." ~ methodName; 406 code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ 407 "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~ 408 "TCancellation cancellation = null) {\n"; 409 code ~= "enum methodName = `" ~ methodName ~ "`;\n"; 410 code ~= q{ 411 alias ReturnType!(MemberType!(Interface, methodName)) ResultType; 412 413 auto childCancellation = new TCancellationOrigin; 414 415 TFuture!ResultType[] futures; 416 futures.reserve(clients_.length); 417 418 foreach (c; clients_) { 419 if (reopenTransports) { 420 if (!c.transport.isOpen) { 421 try { 422 c.transport.open(); 423 } catch (Exception e) { 424 continue; 425 } 426 } 427 } 428 futures ~= mixin("c." ~ methodName)(args, childCancellation); 429 } 430 431 return new FastestPoolJob!(ResultType)( 432 futures, rpcFaultFilter, cancellation, childCancellation); 433 }; 434 code ~= "}\n"; 435 } 436 437 return code; 438 } 439 440 final class FastestPoolJob(Result) : TFuture!Result { 441 this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter, 442 TCancellation cancellation, TCancellationOrigin childCancellation 443 ) { 444 resultPromise_ = new TPromise!Result; 445 poolFutures_ = poolFutures; 446 rpcFaultFilter_ = rpcFaultFilter; 447 childCancellation_ = childCancellation; 448 449 foreach (future; poolFutures) { 450 future.completion.addCallback({ 451 auto f = future; 452 return { completionCallback(f); }; 453 }()); 454 if (future.status != TFutureStatus.RUNNING) { 455 // If the current future is already completed, we are done, don't 456 // bother adding callbacks for the others (they would just return 457 // immediately after acquiring the lock). 458 return; 459 } 460 } 461 462 if (cancellation) { 463 cancellation.triggering.addCallback({ 464 resultPromise_.cancel(); 465 childCancellation.trigger(); 466 }); 467 } 468 } 469 470 TFutureStatus status() const @property { 471 return resultPromise_.status; 472 } 473 474 TAwaitable completion() @property { 475 return resultPromise_.completion; 476 } 477 478 Result get() { 479 return resultPromise_.get(); 480 } 481 482 Exception getException() { 483 return resultPromise_.getException(); 484 } 485 486 private: 487 void completionCallback(TFuture!Result future) { 488 synchronized { 489 if (future.status == TFutureStatus.CANCELLED) { 490 assert(resultPromise_.status != TFutureStatus.RUNNING); 491 return; 492 } 493 494 if (resultPromise_.status != TFutureStatus.RUNNING) { 495 // The operation has already been completed. This can happen if 496 // another client completed first, but this callback was already 497 // waiting for the lock when it called cancel(). 498 return; 499 } 500 501 if (future.status == TFutureStatus.FAILED) { 502 auto e = future.getException(); 503 if (rpcFaultFilter_ && rpcFaultFilter_(e)) { 504 rpcExceptions_ ~= e; 505 506 if (rpcExceptions_.length == poolFutures_.length) { 507 resultPromise_.fail(new TCompoundOperationException( 508 "All child operations failed, unable to retrieve a result.", 509 rpcExceptions_ 510 )); 511 } 512 513 return; 514 } 515 } 516 517 // Store the result to the target promise. 518 resultPromise_.complete(future); 519 520 // Cancel the other futures, we would just discard their results. 521 // Note: We do this after we have stored the results to our promise, 522 // see the assert at the top of the function. 523 childCancellation_.trigger(); 524 } 525 } 526 527 TPromise!Result resultPromise_; 528 TFuture!Result[] poolFutures_; 529 Exception[] rpcExceptions_; 530 bool delegate(Exception) rpcFaultFilter_; 531 TCancellationOrigin childCancellation_; 532 } 533 } 534 535 /** 536 * Allows easily aggregating results from a number of TAsyncClients. 537 * 538 * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not 539 * simply implement TFutureInterface!Interface. It manages a pool of clients, 540 * but allows the user to specify a custom accumulator function to use or to 541 * iterate over the results using a TFutureAggregatorRange. 542 * 543 * For each service method, TAsyncAggregator offers a method 544 * accepting the same arguments, and an optional TCancellation instance, just 545 * like with TFutureInterface. The return type, however, is a proxy object 546 * that offers the following methods: 547 * --- 548 * /++ 549 * + Returns a thrift.util.future.TFutureAggregatorRange for the results of 550 * + the client pool method invocations. 551 * + 552 * + The [] (slicing) operator can also be used to obtain the range. 553 * + 554 * + Params: 555 * + timeout = A timeout to pass to the TFutureAggregatorRange constructor, 556 * + defaults to zero (no timeout). 557 * +/ 558 * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0)); 559 * auto opSlice() { return range(); } /// Ditto 560 * 561 * /++ 562 * + Returns a future that gathers the results from the clients in the pool 563 * + and invokes a user-supplied accumulator function on them, returning its 564 * + return value to the client. 565 * + 566 * + In addition to the TFuture!AccumulatedType interface (where 567 * + AccumulatedType is the return type of the accumulator function), the 568 * + returned object also offers two additional methods, finish() and 569 * + finishGet(): By default, the accumulator functions is called after all 570 * + the results from the pool clients have become available. Calling finish() 571 * + causes the accumulator future to stop waiting for other results and 572 * + immediately invoking the accumulator function on the results currently 573 * + available. If all results are already available, finish() is a no-op. 574 * + finishGet() is a convenience shortcut for combining it with 575 * + a call to get() immediately afterwards, like waitGet() is for wait(). 576 * + 577 * + The acc alias can point to any callable accepting either an array of 578 * + return values or an array of return values and an array of exceptions; 579 * + see isAccumulator!() for details. The default accumulator concatenates 580 * + return values that can be concatenated with each others (e.g. arrays), 581 * + and simply returns an array of values otherwise, failing with a 582 * + TCompoundOperationException no values were returned. 583 * + 584 * + The accumulator function is not executed in any of the async manager 585 * + worker threads associated with the async clients, but instead it is 586 * + invoked when the actual result is requested for the first time after the 587 * + operation has been completed. This also includes checking the status 588 * + of the operation once it is no longer running, since the accumulator 589 * + has to be run to determine whether the operation succeeded or failed. 590 * +/ 591 * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc); 592 * --- 593 * 594 * Example: 595 * --- 596 * // Some Thrift service. 597 * interface Foo { 598 * int foo(string name); 599 * byte[] bar(); 600 * } 601 * 602 * // Create the aggregator pool – client0, client1, client2 are some 603 * // TAsyncClient!Foo instances, but in theory could also be other 604 * // TFutureInterface!Foo implementations (e.g. some async client pool). 605 * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]); 606 * 607 * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) { 608 * // Process all the results that are available before a second has passed, 609 * // in the order they arrive. 610 * writeln(val); 611 * } 612 * 613 * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){ 614 * if (vals.empty) { 615 * throw new TCompoundOperationException("All clients failed", exs); 616 * } 617 * 618 * // Just to illustrate that the type of the values can change, convert the 619 * // numbers to double and sum up their roots. 620 * double result = 0; 621 * foreach (v; vals) result += sqrt(cast(double)v); 622 * return result; 623 * })(); 624 * 625 * // Wait up to three seconds for the result, and then accumulate what has 626 * // arrived so far. 627 * sumRoots.completion.wait(dur!"seconds"(3)); 628 * writeln(sumRoots.finishGet()); 629 * 630 * // For scalars, the default accumulator returns an array of the values. 631 * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[]. 632 * 633 * // For lists, etc., it concatenates the results together. 634 * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[]. 635 * --- 636 * 637 * Note: For the accumulate!() interface, you might currently hit a »cannot use 638 * local '…' as parameter to non-global template accumulate«-error, see 639 * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need 640 * to access the surrounding scope, you might want to use a function literal 641 * instead of a delegate to avoid the issue. 642 */ 643 class TAsyncAggregator(Interface) if (isBaseService!Interface) { 644 /// Shorthand for the client type this instance operates on. 645 alias TAsyncClientBase!Interface Client; 646 647 /// 648 this(Client[] clients) { 649 clients_ = clients; 650 } 651 652 /// Whether to open the underlying transports of a client before trying to 653 /// execute a method if they are not open. This is usually desirable 654 /// because it allows e.g. to automatically reconnect to a remote server 655 /// if the network connection is dropped. 656 /// 657 /// Defaults to true. 658 bool reopenTransports = true; 659 660 mixin AggregatorOpDispatch!(); 661 662 private: 663 Client[] clients_; 664 } 665 666 /// Ditto 667 class TAsyncAggregator(Interface) if (isDerivedService!Interface) : 668 TAsyncAggregator!(BaseService!Interface) 669 { 670 /// Shorthand for the client type this instance operates on. 671 alias TAsyncClientBase!Interface Client; 672 673 /// 674 this(Client[] clients) { 675 super(cast(TAsyncClientBase!(BaseService!Interface)[])clients); 676 } 677 678 mixin AggregatorOpDispatch!(); 679 } 680 681 /** 682 * Whether fun is a valid accumulator function for values of type ValueType. 683 * 684 * For this to be true, fun must be a callable matching one of the following 685 * argument lists: 686 * --- 687 * fun(ValueType[] values); 688 * fun(ValueType[] values, Exception[] exceptions); 689 * --- 690 * 691 * The second version is passed the collected array exceptions from all the 692 * clients in the pool. 693 * 694 * The return value of the accumulator function is passed to the client (via 695 * the result future). If it throws an exception, the operation is marked as 696 * failed with the given exception instead. 697 */ 698 template isAccumulator(ValueType, alias fun) { 699 enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) || 700 is(typeof(fun(cast(ValueType[])[], cast(Exception[])[]))); 701 } 702 703 /** 704 * TAsyncAggregator construction helper to avoid having to explicitly 705 * specify the interface type, i.e. to allow the constructor being called 706 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). 707 */ 708 TAsyncAggregator!Interface tAsyncAggregator(Interface)( 709 TAsyncClientBase!Interface[] clients 710 ) if (isService!Interface) { 711 return new typeof(return)(clients); 712 } 713 714 private { 715 mixin template AggregatorOpDispatch() { 716 auto opDispatch(string name, Args...)(Args args) if ( 717 is(typeof(mixin("Interface.init." ~ name)(args))) 718 ) { 719 alias ReturnType!(MemberType!(Interface, name)) ResultType; 720 721 auto childCancellation = new TCancellationOrigin; 722 723 TFuture!ResultType[] futures; 724 futures.reserve(clients_.length); 725 726 foreach (c; cast(Client[])clients_) { 727 if (reopenTransports) { 728 if (!c.transport.isOpen) { 729 try { 730 c.transport.open(); 731 } catch (Exception e) { 732 continue; 733 } 734 } 735 } 736 futures ~= mixin("c." ~ name)(args, childCancellation); 737 } 738 739 return AggregationResult!ResultType(futures, childCancellation); 740 } 741 } 742 743 struct AggregationResult(T) { 744 auto opSlice() { 745 return range(); 746 } 747 748 auto range(Duration timeout = dur!"hnsecs"(0)) { 749 return tFutureAggregatorRange(futures_, childCancellation_, timeout); 750 } 751 752 auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) { 753 return new AccumulatorJob!(T, acc)(futures_, childCancellation_); 754 } 755 756 private: 757 TFuture!T[] futures_; 758 TCancellationOrigin childCancellation_; 759 } 760 761 auto defaultAccumulator(T)(T[] values, Exception[] exceptions) { 762 if (values.empty) { 763 throw new TCompoundOperationException("All clients failed", 764 exceptions); 765 } 766 767 static if (is(typeof(T.init ~ T.init))) { 768 import std.algorithm; 769 return reduce!"a ~ b"(values); 770 } else { 771 return values; 772 } 773 } 774 775 final class AccumulatorJob(T, alias accumulator) if ( 776 isAccumulator!(T, accumulator) 777 ) : TFuture!(AccumulatorResult!(T, accumulator)) { 778 this(TFuture!T[] futures, TCancellationOrigin childCancellation) { 779 futures_ = futures; 780 childCancellation_ = childCancellation; 781 resultMutex_ = new Mutex; 782 completionEvent_ = new TOneshotEvent; 783 784 foreach (future; futures) { 785 future.completion.addCallback({ 786 auto f = future; 787 return { 788 synchronized (resultMutex_) { 789 if (f.status == TFutureStatus.CANCELLED) { 790 if (!finished_) { 791 status_ = TFutureStatus.CANCELLED; 792 finished_ = true; 793 } 794 return; 795 } 796 797 if (f.status == TFutureStatus.FAILED) { 798 exceptions_ ~= f.getException(); 799 } else { 800 results_ ~= f.get(); 801 } 802 803 if (results_.length + exceptions_.length == futures_.length) { 804 finished_ = true; 805 completionEvent_.trigger(); 806 } 807 } 808 }; 809 }()); 810 } 811 } 812 813 TFutureStatus status() @property { 814 synchronized (resultMutex_) { 815 if (!finished_) return TFutureStatus.RUNNING; 816 if (status_ != TFutureStatus.RUNNING) return status_; 817 818 try { 819 result_ = invokeAccumulator!accumulator(results_, exceptions_); 820 status_ = TFutureStatus.SUCCEEDED; 821 } catch (Exception e) { 822 exception_ = e; 823 status_ = TFutureStatus.FAILED; 824 } 825 826 return status_; 827 } 828 } 829 830 TAwaitable completion() @property { 831 return completionEvent_; 832 } 833 834 AccumulatorResult!(T, accumulator) get() { 835 auto s = status; 836 837 enforce(s != TFutureStatus.RUNNING, 838 new TFutureException("Operation not yet completed.")); 839 840 if (s == TFutureStatus.CANCELLED) throw new TCancelledException; 841 if (s == TFutureStatus.FAILED) throw exception_; 842 return result_; 843 } 844 845 Exception getException() { 846 auto s = status; 847 enforce(s != TFutureStatus.RUNNING, 848 new TFutureException("Operation not yet completed.")); 849 850 if (s == TFutureStatus.CANCELLED) throw new TCancelledException; 851 852 if (s == TFutureStatus.SUCCEEDED) { 853 return null; 854 } 855 return exception_; 856 } 857 858 void finish() { 859 synchronized (resultMutex_) { 860 if (!finished_) { 861 finished_ = true; 862 childCancellation_.trigger(); 863 completionEvent_.trigger(); 864 } 865 } 866 } 867 868 auto finishGet() { 869 finish(); 870 return get(); 871 } 872 873 private: 874 TFuture!T[] futures_; 875 TCancellationOrigin childCancellation_; 876 877 bool finished_; 878 T[] results_; 879 Exception[] exceptions_; 880 881 TFutureStatus status_; 882 Mutex resultMutex_; 883 union { 884 AccumulatorResult!(T, accumulator) result_; 885 Exception exception_; 886 } 887 TOneshotEvent completionEvent_; 888 } 889 890 auto invokeAccumulator(alias accumulator, T)( 891 T[] values, Exception[] exceptions 892 ) if ( 893 isAccumulator!(T, accumulator) 894 ) { 895 static if (is(typeof(accumulator(values, exceptions)))) { 896 return accumulator(values, exceptions); 897 } else { 898 return accumulator(values); 899 } 900 } 901 902 template AccumulatorResult(T, alias acc) { 903 alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[])) 904 AccumulatorResult; 905 } 906 }