1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * Defines the interface used for client-side handling of asynchronous 22 * I/O operations, based on coroutines. 23 * 24 * The main piece of the »client side« (e.g. for TAsyncClient users) of the 25 * API is TFuture, which represents an asynchronously executed operation, 26 * which can have a return value, throw exceptions, and which can be waited 27 * upon. 28 * 29 * On the »implementation side«, the idea is that by using a TAsyncTransport 30 * instead of a normal TTransport and executing the work through a 31 * TAsyncManager, the same code as for synchronous I/O can be used for 32 * asynchronous operation as well, for example: 33 * 34 * --- 35 * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port); 36 * // … 37 * socket.asyncManager.execute(socket, { 38 * SomeThriftStruct s; 39 * 40 * // Waiting for socket I/O will not block an entire thread but cause 41 * // the async manager to execute another task in the meantime, because 42 * // we are using TAsyncSocket instead of TSocket. 43 * s.read(socket); 44 * 45 * // Do something with s, e.g. set a TPromise result to it. 46 * writeln(s); 47 * }); 48 * --- 49 */ 50 module thrift.async.base; 51 52 import core.time : Duration, dur; 53 import std.socket/+ : Socket+/; // DMD @@BUG314@@ 54 import thrift.base; 55 import thrift.transport.base; 56 import thrift.util.cancellation; 57 58 /** 59 * Manages one or more asynchronous transport resources (e.g. sockets in the 60 * case of TAsyncSocketManager) and allows work items to be submitted for them. 61 * 62 * Implementations will typically run one or more background threads for 63 * executing the work, which is one of the reasons for a TAsyncManager to be 64 * used. Each work item is run in its own fiber and is expected to yield() away 65 * while waiting for time-consuming operations. 66 * 67 * The second important purpose of TAsyncManager is to serialize access to 68 * the transport resources – without taking care of that, e.g. issuing multiple 69 * RPC calls over the same connection in rapid succession would likely lead to 70 * more than one request being written at the same time, causing only garbage 71 * to arrive at the remote end. 72 * 73 * All methods are thread-safe. 74 */ 75 interface TAsyncManager { 76 /** 77 * Submits a work item to be executed asynchronously. 78 * 79 * Access to asnyc transports is serialized – if two work items associated 80 * with the same transport are submitted, the second delegate will not be 81 * invoked until the first has returned, even it the latter context-switches 82 * away (because it is waiting for I/O) and the async manager is idle 83 * otherwise. 84 * 85 * Optionally, a TCancellation instance can be specified. If present, 86 * triggering it will be considered a request to cancel the work item, if it 87 * is still waiting for the associated transport to become available. 88 * Delegates which are already being processed (i.e. waiting for I/O) are not 89 * affected because this would bring the connection into an undefined state 90 * (as probably half-written request or a half-read response would be left 91 * behind). 92 * 93 * Params: 94 * transport = The TAsyncTransport the work delegate will operate on. Must 95 * be associated with this TAsyncManager instance. 96 * work = The operations to execute on the given transport. Must never 97 * throw, errors should be handled in another way. nothrow semantics are 98 * difficult to enforce in combination with fibres though, so currently 99 * exceptions are just swallowed by TAsyncManager implementations. 100 * cancellation = If set, can be used to request cancellatinon of this work 101 * item if it is still waiting to be executed. 102 * 103 * Note: The work item will likely be executed in a different thread, so make 104 * sure the code it relies on is thread-safe. An exception are the async 105 * transports themselves, to which access is serialized as noted above. 106 */ 107 void execute(TAsyncTransport transport, void delegate() work, 108 TCancellation cancellation = null 109 ) in { 110 assert(transport.asyncManager is this, 111 "The given transport must be associated with this TAsyncManager."); 112 } 113 114 /** 115 * Submits a delegate to be executed after a certain amount of time has 116 * passed. 117 * 118 * The actual amount of time elapsed can be higher if the async manager 119 * instance is busy and thus should not be relied on. The 120 * 121 * Params: 122 * duration = The amount of time to wait before starting to execute the 123 * work delegate. 124 * work = The code to execute after the specified amount of time has passed. 125 * 126 * Example: 127 * --- 128 * // A very basic example – usually, the actuall work item would enqueue 129 * // some async transport operation. 130 * auto asyncMangager = someAsyncManager(); 131 * 132 * TFuture!int calculate() { 133 * // Create a promise and asynchronously set its value after three 134 * // seconds have passed. 135 * auto promise = new TPromise!int; 136 * asyncManager.delay(dur!"seconds"(3), { 137 * promise.succeed(42); 138 * }); 139 * 140 * // Immediately return it to the caller. 141 * return promise; 142 * } 143 * 144 * // This will wait until the result is available and then print it. 145 * writeln(calculate().waitGet()); 146 * --- 147 */ 148 void delay(Duration duration, void delegate() work); 149 150 /** 151 * Shuts down all background threads or other facilities that might have 152 * been started in order to execute work items. This function is typically 153 * called during program shutdown. 154 * 155 * If there are still tasks to be executed when the timeout expires, any 156 * currently executed work items will never receive any notifications 157 * for async transports managed by this instance, queued work items will 158 * be silently dropped, and implementations are allowed to leak resources. 159 * 160 * Params: 161 * waitFinishTimeout = If positive, waits for all work items to be 162 * finished for the specified amount of time, if negative, waits for 163 * completion without ever timing out, if zero, immediately shuts down 164 * the background facilities. 165 */ 166 bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)); 167 } 168 169 /** 170 * A TTransport which uses a TAsyncManager to schedule non-blocking operations. 171 * 172 * The actual type of device is not specified; typically, implementations will 173 * depend on an interface derived from TAsyncManager to be notified of changes 174 * in the transport state. 175 * 176 * The peeking, reading, writing and flushing methods must always be called 177 * from within the associated async manager. 178 */ 179 interface TAsyncTransport : TTransport { 180 /** 181 * The TAsyncManager associated with this transport. 182 */ 183 TAsyncManager asyncManager() @property; 184 } 185 186 /** 187 * A TAsyncManager providing notificiations for socket events. 188 */ 189 interface TAsyncSocketManager : TAsyncManager { 190 /** 191 * Adds a listener that is triggered once when an event of the specified type 192 * occurs, and removed afterwards. 193 * 194 * Params: 195 * socket = The socket to listen for events at. 196 * eventType = The type of the event to listen for. 197 * timeout = The period of time after which the listener will be called 198 * with TAsyncEventReason.TIMED_OUT if no event happened. 199 * listener = The delegate to call when an event happened. 200 */ 201 void addOneshotListener(Socket socket, TAsyncEventType eventType, 202 Duration timeout, TSocketEventListener listener); 203 204 /// Ditto 205 void addOneshotListener(Socket socket, TAsyncEventType eventType, 206 TSocketEventListener listener); 207 } 208 209 /** 210 * Types of events that can happen for an asynchronous transport. 211 */ 212 enum TAsyncEventType { 213 READ, /// New data became available to read. 214 WRITE /// The transport became ready to be written to. 215 } 216 217 /** 218 * The type of the delegates used to register socket event handlers. 219 */ 220 alias void delegate(TAsyncEventReason callReason) TSocketEventListener; 221 222 /** 223 * The reason a listener was called. 224 */ 225 enum TAsyncEventReason : byte { 226 NORMAL, /// The event listened for was triggered normally. 227 TIMED_OUT /// A timeout for the event was set, and it expired. 228 }