1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.util.future; 20 21 import core.atomic; 22 import core.sync.condition; 23 import core.sync.mutex; 24 import core.time : Duration; 25 import std.array : empty, front, popFront; 26 import std.conv : to; 27 import std.exception : enforce; 28 import std.traits : BaseTypeTuple, isSomeFunction, ParameterTypeTuple, ReturnType; 29 import thrift.base; 30 import thrift.util.awaitable; 31 import thrift.util.cancellation; 32 33 /** 34 * Represents an operation which is executed asynchronously and the result of 35 * which will become available at some point in the future. 36 * 37 * Once a operation is completed, the result of the operation can be fetched 38 * via the get() family of methods. There are three possible cases: Either the 39 * operation succeeded, then its return value is returned, or it failed by 40 * throwing, in which case the exception is rethrown, or it was cancelled 41 * before, then a TCancelledException is thrown. There might be TFuture 42 * implementations which never possibly enter the cancelled state. 43 * 44 * All methods are thread-safe, but keep in mind that any exception object or 45 * result (if it is a reference type, of course) is shared between all 46 * get()-family invocations. 47 */ 48 interface TFuture(ResultType) { 49 /** 50 * The status the operation is currently in. 51 * 52 * An operation starts out in RUNNING status, and changes state to one of the 53 * others at most once afterwards. 54 */ 55 TFutureStatus status() @property; 56 57 /** 58 * A TAwaitable triggered when the operation leaves the RUNNING status. 59 */ 60 TAwaitable completion() @property; 61 62 /** 63 * Convenience shorthand for waiting until the result is available and then 64 * get()ing it. 65 * 66 * If the operation has already completed, the result is immediately 67 * returned. 68 * 69 * The result of this method is »alias this«'d to the interface, so that 70 * TFuture can be used as a drop-in replacement for a simple value in 71 * synchronous code. 72 */ 73 final ResultType waitGet() { 74 completion.wait(); 75 return get(); 76 } 77 final @property auto waitGetProperty() { return waitGet(); } 78 alias waitGetProperty this; 79 80 /** 81 * Convenience shorthand for waiting until the result is available and then 82 * get()ing it. 83 * 84 * If the operation completes in time, returns its result (resp. throws an 85 * exception for the failed/cancelled cases). If not, throws a 86 * TFutureException. 87 */ 88 final ResultType waitGet(Duration timeout) { 89 enforce(completion.wait(timeout), new TFutureException( 90 "Operation did not complete in time.")); 91 return get(); 92 } 93 94 /** 95 * Returns the result of the operation. 96 * 97 * Throws: TFutureException if the operation has been cancelled, 98 * TCancelledException if it is not yet done; the set exception if it 99 * failed. 100 */ 101 ResultType get(); 102 103 /** 104 * Returns the captured exception if the operation failed, or null otherwise. 105 * 106 * Throws: TFutureException if not yet done, TCancelledException if the 107 * operation has been cancelled. 108 */ 109 Exception getException(); 110 } 111 112 /** 113 * The states the operation offering a future interface can be in. 114 */ 115 enum TFutureStatus : byte { 116 RUNNING, /// The operation is still running. 117 SUCCEEDED, /// The operation completed without throwing an exception. 118 FAILED, /// The operation completed by throwing an exception. 119 CANCELLED /// The operation was cancelled. 120 } 121 122 /** 123 * A TFuture covering the simple but common case where the result is simply 124 * set by a call to succeed()/fail(). 125 * 126 * All methods are thread-safe, but usually, succeed()/fail() are only called 127 * from a single thread (different from the thread(s) waiting for the result 128 * using the TFuture interface, though). 129 */ 130 class TPromise(ResultType) : TFuture!ResultType { 131 this() { 132 statusMutex_ = new Mutex; 133 completionEvent_ = new TOneshotEvent; 134 } 135 136 override S status() const @property { 137 return atomicLoad(status_); 138 } 139 140 override TAwaitable completion() @property { 141 return completionEvent_; 142 } 143 144 override ResultType get() { 145 auto s = atomicLoad(status_); 146 enforce(s != S.RUNNING, 147 new TFutureException("Operation not yet completed.")); 148 149 if (s == S.CANCELLED) throw new TCancelledException; 150 if (s == S.FAILED) throw exception_; 151 152 static if (!is(ResultType == void)) { 153 return result_; 154 } 155 } 156 157 override Exception getException() { 158 auto s = atomicLoad(status_); 159 enforce(s != S.RUNNING, 160 new TFutureException("Operation not yet completed.")); 161 162 if (s == S.CANCELLED) throw new TCancelledException; 163 if (s == S.SUCCEEDED) return null; 164 165 return exception_; 166 } 167 168 static if (!is(ResultType == void)) { 169 /** 170 * Sets the result of the operation, marks it as done, and notifies any 171 * waiters. 172 * 173 * If the operation has been cancelled before, nothing happens. 174 * 175 * Throws: TFutureException if the operation is already completed. 176 */ 177 void succeed(ResultType result) { 178 synchronized (statusMutex_) { 179 auto s = atomicLoad(status_); 180 if (s == S.CANCELLED) return; 181 182 enforce(s == S.RUNNING, 183 new TFutureException("Operation already completed.")); 184 result_ = result; 185 186 atomicStore(status_, S.SUCCEEDED); 187 } 188 189 completionEvent_.trigger(); 190 } 191 } else { 192 void succeed() { 193 synchronized (statusMutex_) { 194 auto s = atomicLoad(status_); 195 if (s == S.CANCELLED) return; 196 197 enforce(s == S.RUNNING, 198 new TFutureException("Operation already completed.")); 199 200 atomicStore(status_, S.SUCCEEDED); 201 } 202 203 completionEvent_.trigger(); 204 } 205 } 206 207 /** 208 * Marks the operation as failed with the specified exception and notifies 209 * any waiters. 210 * 211 * If the operation was already cancelled, nothing happens. 212 * 213 * Throws: TFutureException if the operation is already completed. 214 */ 215 void fail(Exception exception) { 216 synchronized (statusMutex_) { 217 auto status = atomicLoad(status_); 218 if (status == S.CANCELLED) return; 219 220 enforce(status == S.RUNNING, 221 new TFutureException("Operation already completed.")); 222 exception_ = exception; 223 224 atomicStore(status_, S.FAILED); 225 } 226 227 completionEvent_.trigger(); 228 } 229 230 231 /** 232 * Marks this operation as completed and takes over the outcome of another 233 * TFuture of the same type. 234 * 235 * If this operation was already cancelled, nothing happens. If the other 236 * operation was cancelled, this operation is marked as failed with a 237 * TCancelledException. 238 * 239 * Throws: TFutureException if the passed in future was not completed or 240 * this operation is already completed. 241 */ 242 void complete(TFuture!ResultType future) { 243 synchronized (statusMutex_) { 244 auto status = atomicLoad(status_); 245 if (status == S.CANCELLED) return; 246 enforce(status == S.RUNNING, 247 new TFutureException("Operation already completed.")); 248 249 enforce(future.status != S.RUNNING, new TFutureException( 250 "The passed TFuture is not yet completed.")); 251 252 status = future.status; 253 if (status == S.CANCELLED) { 254 status = S.FAILED; 255 exception_ = new TCancelledException; 256 } else if (status == S.FAILED) { 257 exception_ = future.getException(); 258 } else static if (!is(ResultType == void)) { 259 result_ = future.get(); 260 } 261 262 atomicStore(status_, status); 263 } 264 265 completionEvent_.trigger(); 266 } 267 268 /** 269 * Marks this operation as cancelled and notifies any waiters. 270 * 271 * If the operation is already completed, nothing happens. 272 */ 273 void cancel() { 274 synchronized (statusMutex_) { 275 auto status = atomicLoad(status_); 276 if (status == S.RUNNING) atomicStore(status_, S.CANCELLED); 277 } 278 279 completionEvent_.trigger(); 280 } 281 282 private: 283 // Convenience alias because TFutureStatus is ubiquitous in this class. 284 alias TFutureStatus S; 285 286 // The status the promise is currently in. 287 shared S status_; 288 289 union { 290 static if (!is(ResultType == void)) { 291 // Set if status_ is SUCCEEDED. 292 ResultType result_; 293 } 294 // Set if status_ is FAILED. 295 Exception exception_; 296 } 297 298 // Protects status_. 299 // As for result_ and exception_: They are only set once, while status_ is 300 // still RUNNING, so given that the operation has already completed, reading 301 // them is safe without holding some kind of lock. 302 Mutex statusMutex_; 303 304 // Triggered when the event completes. 305 TOneshotEvent completionEvent_; 306 } 307 308 /// 309 class TFutureException : TException { 310 /// 311 this(string msg = "", string file = __FILE__, size_t line = __LINE__, 312 Throwable next = null) 313 { 314 super(msg, file, line, next); 315 } 316 } 317 318 /** 319 * Creates an interface that is similar to a given one, but accepts an 320 * additional, optional TCancellation parameter each method, and returns 321 * TFutures instead of plain return values. 322 * 323 * For example, given the following declarations: 324 * --- 325 * interface Foo { 326 * void bar(); 327 * string baz(int a); 328 * } 329 * alias TFutureInterface!Foo FutureFoo; 330 * --- 331 * 332 * FutureFoo would be equivalent to: 333 * --- 334 * interface FutureFoo { 335 * TFuture!void bar(TCancellation cancellation = null); 336 * TFuture!string baz(int a, TCancellation cancellation = null); 337 * } 338 * --- 339 */ 340 template TFutureInterface(Interface) if (is(Interface _ == interface)) { 341 mixin({ 342 string code = "interface TFutureInterface \n"; 343 344 static if (is(Interface Bases == super) && Bases.length > 0) { 345 code ~= ": "; 346 foreach (i; 0 .. Bases.length) { 347 if (i > 0) code ~= ", "; 348 code ~= "TFutureInterface!(BaseTypeTuple!Interface[" ~ to!string(i) ~ "]) "; 349 } 350 } 351 352 code ~= "{\n"; 353 354 foreach (methodName; __traits(derivedMembers, Interface)) { 355 enum qn = "Interface." ~ methodName; 356 static if (isSomeFunction!(mixin(qn))) { 357 code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ 358 "(ParameterTypeTuple!(" ~ qn ~ "), TCancellation cancellation = null);\n"; 359 } 360 } 361 362 code ~= "}\n"; 363 return code; 364 }()); 365 } 366 367 /** 368 * An input range that aggregates results from multiple asynchronous operations, 369 * returning them in the order they arrive. 370 * 371 * Additionally, a timeout can be set after which results from not yet finished 372 * futures will no longer be waited for, e.g. to ensure the time it takes to 373 * iterate over a set of results is limited. 374 */ 375 final class TFutureAggregatorRange(T) { 376 /** 377 * Constructs a new instance. 378 * 379 * Params: 380 * futures = The set of futures to collect results from. 381 * timeout = If positive, not yet finished futures will be cancelled and 382 * their results will not be taken into account. 383 */ 384 this(TFuture!T[] futures, TCancellationOrigin childCancellation, 385 Duration timeout = dur!"hnsecs"(0) 386 ) { 387 if (timeout > dur!"hnsecs"(0)) { 388 timeoutSysTick_ = TickDuration.currSystemTick + 389 TickDuration.from!"hnsecs"(timeout.total!"hnsecs"); 390 } else { 391 timeoutSysTick_ = TickDuration(0); 392 } 393 394 queueMutex_ = new Mutex; 395 queueNonEmptyCondition_ = new Condition(queueMutex_); 396 futures_ = futures; 397 childCancellation_ = childCancellation; 398 399 foreach (future; futures_) { 400 future.completion.addCallback({ 401 auto f = future; 402 return { 403 if (f.status == TFutureStatus.CANCELLED) return; 404 assert(f.status != TFutureStatus.RUNNING); 405 406 synchronized (queueMutex_) { 407 completedQueue_ ~= f; 408 409 if (completedQueue_.length == 1) { 410 queueNonEmptyCondition_.notifyAll(); 411 } 412 } 413 }; 414 }()); 415 } 416 } 417 418 /** 419 * Whether the range is empty. 420 * 421 * This is the case if the results from the completed futures not having 422 * failed have already been popped and either all future have been finished 423 * or the timeout has expired. 424 * 425 * Potentially blocks until a new result is available or the timeout has 426 * expired. 427 */ 428 bool empty() @property { 429 if (finished_) return true; 430 if (bufferFilled_) return false; 431 432 while (true) { 433 TFuture!T future; 434 synchronized (queueMutex_) { 435 // The while loop is just being cautious about spurious wakeups, in 436 // case they should be possible. 437 while (completedQueue_.empty) { 438 auto remaining = to!Duration(timeoutSysTick_ - 439 TickDuration.currSystemTick); 440 441 if (remaining <= dur!"hnsecs"(0)) { 442 // No time left, but still no element received – we are empty now. 443 finished_ = true; 444 childCancellation_.trigger(); 445 return true; 446 } 447 448 queueNonEmptyCondition_.wait(remaining); 449 } 450 451 future = completedQueue_.front; 452 completedQueue_.popFront(); 453 } 454 455 ++completedCount_; 456 if (completedCount_ == futures_.length) { 457 // This was the last future in the list, there is no possibility 458 // another result could ever become available. 459 finished_ = true; 460 } 461 462 if (future.status == TFutureStatus.FAILED) { 463 // This one failed, loop again and try getting another item from 464 // the queue. 465 exceptions_ ~= future.getException(); 466 } else { 467 resultBuffer_ = future.get(); 468 bufferFilled_ = true; 469 return false; 470 } 471 } 472 } 473 474 /** 475 * Returns the first element from the range. 476 * 477 * Potentially blocks until a new result is available or the timeout has 478 * expired. 479 * 480 * Throws: TException if the range is empty. 481 */ 482 T front() { 483 enforce(!empty, new TException( 484 "Cannot get front of an empty future aggregator range.")); 485 return resultBuffer_; 486 } 487 488 /** 489 * Removes the first element from the range. 490 * 491 * Potentially blocks until a new result is available or the timeout has 492 * expired. 493 * 494 * Throws: TException if the range is empty. 495 */ 496 void popFront() { 497 enforce(!empty, new TException( 498 "Cannot pop front of an empty future aggregator range.")); 499 bufferFilled_ = false; 500 } 501 502 /** 503 * The number of futures the result of which has been returned or which have 504 * failed so far. 505 */ 506 size_t completedCount() @property const { 507 return completedCount_; 508 } 509 510 /** 511 * The exceptions collected from failed TFutures so far. 512 */ 513 Exception[] exceptions() @property { 514 return exceptions_; 515 } 516 517 private: 518 TFuture!T[] futures_; 519 TCancellationOrigin childCancellation_; 520 521 // The system tick this operation will time out, or zero if no timeout has 522 // been set. 523 TickDuration timeoutSysTick_; 524 525 bool finished_; 526 527 bool bufferFilled_; 528 T resultBuffer_; 529 530 Exception[] exceptions_; 531 size_t completedCount_; 532 533 // The queue of completed futures. This (and the associated condition) are 534 // the only parts of this class that are accessed by multiple threads. 535 TFuture!T[] completedQueue_; 536 Mutex queueMutex_; 537 Condition queueNonEmptyCondition_; 538 } 539 540 /** 541 * TFutureAggregatorRange construction helper to avoid having to explicitly 542 * specify the value type, i.e. to allow the constructor being called using IFTI 543 * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)). 544 */ 545 TFutureAggregatorRange!T tFutureAggregatorRange(T)(TFuture!T[] futures, 546 TCancellationOrigin childCancellation, Duration timeout = dur!"hnsecs"(0) 547 ) { 548 return new TFutureAggregatorRange!T(futures, childCancellation, timeout); 549 }