1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.async.libevent; 20 21 import core.atomic; 22 import core.time : Duration, dur; 23 import core.exception : onOutOfMemoryError; 24 import core.memory : GC; 25 import core.thread : Fiber, Thread; 26 import core.sync.condition; 27 import core.sync.mutex; 28 import core.stdc.stdlib : free, malloc; 29 import deimos.event2.event; 30 import std.array : empty, front, popFront; 31 import std.conv : text, to; 32 import std.exception : enforce; 33 import std.socket : Socket, socketPair; 34 import thrift.base; 35 import thrift.async.base; 36 import thrift.internal.socket; 37 import thrift.internal.traits; 38 import thrift.util.cancellation; 39 40 // To avoid DMD @@BUG6395@@. 41 import thrift.internal.algorithm; 42 43 /** 44 * A TAsyncManager implementation based on libevent. 45 * 46 * The libevent loop for handling non-blocking sockets is run in a background 47 * thread, which is lazily spawned. The thread is not daemonized to avoid 48 * crashes on program shutdown, it is only stopped when the manager instance 49 * is destroyed. So, to ensure a clean program teardown, either make sure this 50 * instance gets destroyed (e.g. by using scope), or manually call stop() at 51 * the end. 52 */ 53 class TLibeventAsyncManager : TAsyncSocketManager { 54 this() { 55 eventBase_ = event_base_new(); 56 57 // Set up the socket pair for transferring control messages to the event 58 // loop. 59 auto pair = socketPair(); 60 controlSendSocket_ = pair[0]; 61 controlReceiveSocket_ = pair[1]; 62 controlReceiveSocket_.blocking = false; 63 64 // Register an event for receiving control messages. 65 controlReceiveEvent_ = event_new(eventBase_, cast(evutil_socket_t)controlReceiveSocket_.handle, 66 EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback), 67 cast(void*)this); 68 event_add(controlReceiveEvent_, null); 69 70 queuedCountMutex_ = new Mutex; 71 zeroQueuedCondition_ = new Condition(queuedCountMutex_); 72 } 73 74 ~this() { 75 // stop() should be safe to call, because either we don't have a worker 76 // thread running and it is a no-op anyway, or it is guaranteed to be 77 // still running (blocked in event_base_loop), and thus guaranteed not to 78 // be garbage collected yet. 79 stop(dur!"hnsecs"(0)); 80 81 event_free(controlReceiveEvent_); 82 event_base_free(eventBase_); 83 eventBase_ = null; 84 } 85 86 override void execute(TAsyncTransport transport, Work work, 87 TCancellation cancellation = null 88 ) { 89 if (cancellation && cancellation.triggered) return; 90 91 // Keep track that there is a new work item to be processed. 92 incrementQueuedCount(); 93 94 ensureWorkerThreadRunning(); 95 96 // We should be able to send the control message as a whole – we currently 97 // assume to be able to receive it at once as well. If this proves to be 98 // unstable (e.g. send could possibly return early if the receiving buffer 99 // is full and the blocking call gets interrupted by a signal), it could 100 // be changed to a more sophisticated scheme. 101 102 // Make sure the delegate context doesn't get GCd while the work item is 103 // on the wire. 104 GC.addRoot(work.ptr); 105 106 // Send work message. 107 sendControlMsg(ControlMsg(MsgType.WORK, work, transport)); 108 109 if (cancellation) { 110 cancellation.triggering.addCallback({ 111 sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport)); 112 }); 113 } 114 } 115 116 override void delay(Duration duration, void delegate() work) { 117 incrementQueuedCount(); 118 119 ensureWorkerThreadRunning(); 120 121 const tv = toTimeval(duration); 122 123 // DMD @@BUG@@: Cannot deduce T to void delegate() here. 124 registerOneshotEvent!(void delegate())( 125 -1, 0, assumeNothrow(&delayCallback), &tv, 126 { 127 work(); 128 decrementQueuedCount(); 129 } 130 ); 131 } 132 133 override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) { 134 bool cleanExit = true; 135 136 synchronized (this) { 137 if (workerThread_) { 138 synchronized (queuedCountMutex_) { 139 if (waitFinishTimeout > dur!"hnsecs"(0)) { 140 if (queuedCount_ > 0) { 141 zeroQueuedCondition_.wait(waitFinishTimeout); 142 } 143 } else if (waitFinishTimeout < dur!"hnsecs"(0)) { 144 while (queuedCount_ > 0) zeroQueuedCondition_.wait(); 145 } else { 146 // waitFinishTimeout is zero, immediately exit in all cases. 147 } 148 cleanExit = (queuedCount_ == 0); 149 } 150 151 event_base_loopbreak(eventBase_); 152 sendControlMsg(ControlMsg(MsgType.SHUTDOWN)); 153 workerThread_.join(); 154 workQueues_ = null; 155 // We have nuked all currently enqueued items, so set the count to 156 // zero. This is safe to do without locking, since the worker thread 157 // is down. 158 queuedCount_ = 0; 159 atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null); 160 } 161 } 162 163 return cleanExit; 164 } 165 166 override void addOneshotListener(Socket socket, TAsyncEventType eventType, 167 TSocketEventListener listener 168 ) { 169 addOneshotListenerImpl(socket, eventType, null, listener); 170 } 171 172 override void addOneshotListener(Socket socket, TAsyncEventType eventType, 173 Duration timeout, TSocketEventListener listener 174 ) { 175 if (timeout <= dur!"hnsecs"(0)) { 176 addOneshotListenerImpl(socket, eventType, null, listener); 177 } else { 178 // This is not really documented well, but libevent does not require to 179 // keep the timeval around after the event was added. 180 auto tv = toTimeval(timeout); 181 addOneshotListenerImpl(socket, eventType, &tv, listener); 182 } 183 } 184 185 private: 186 alias void delegate() Work; 187 188 void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType, 189 const(timeval)* timeout, TSocketEventListener listener 190 ) { 191 registerOneshotEvent(cast(evutil_socket_t)socket.handle, libeventEventType(eventType), 192 assumeNothrow(&socketCallback), timeout, listener); 193 } 194 195 void registerOneshotEvent(T)(evutil_socket_t fd, short type, 196 event_callback_fn callback, const(timeval)* timeout, T payload 197 ) { 198 // Create a copy of the payload on the C heap. 199 auto payloadMem = malloc(payload.sizeof); 200 if (!payloadMem) onOutOfMemoryError(); 201 (cast(T*)payloadMem)[0 .. 1] = payload; 202 GC.addRange(payloadMem, payload.sizeof); 203 204 auto result = event_base_once(eventBase_, fd, type, callback, 205 payloadMem, timeout); 206 207 // Assuming that we didn't get our arguments wrong above, the only other 208 // situation in which event_base_once can fail is when it can't allocate 209 // memory. 210 if (result != 0) onOutOfMemoryError(); 211 } 212 213 enum MsgType : ubyte { 214 SHUTDOWN, 215 WORK, 216 CANCEL 217 } 218 219 struct ControlMsg { 220 MsgType type; 221 Work work; 222 TAsyncTransport transport; 223 } 224 225 /** 226 * Starts the worker thread if it is not already running. 227 */ 228 void ensureWorkerThreadRunning() { 229 // Technically, only half barriers would be required here, but adding the 230 // argument seems to trigger a DMD template argument deduction @@BUG@@. 231 if (!atomicLoad(*(cast(shared)&workerThread_))) { 232 synchronized (this) { 233 if (!workerThread_) { 234 auto thread = new Thread({ event_base_loop(eventBase_, 0); }); 235 thread.start(); 236 atomicStore(*(cast(shared)&workerThread_), cast(shared)thread); 237 } 238 } 239 } 240 } 241 242 /** 243 * Sends a control message to the worker thread. 244 */ 245 void sendControlMsg(const(ControlMsg) msg) { 246 auto result = controlSendSocket_.send((&msg)[0 .. 1]); 247 enum size = msg.sizeof; 248 enforce(result == size, new TException(text( 249 "Sending control message of type ", msg.type, " failed (", result, 250 " bytes instead of ", size, " transmitted)."))); 251 } 252 253 /** 254 * Receives messages from the control message socket and acts on them. Called 255 * from the worker thread. 256 */ 257 void receiveControlMsg() { 258 // Read as many new work items off the socket as possible (at least one 259 // should be available, as we got notified by libevent). 260 ControlMsg msg; 261 ptrdiff_t bytesRead; 262 while (true) { 263 bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1])); 264 265 if (bytesRead < 0) { 266 auto errno = getSocketErrno(); 267 if (errno != WOULD_BLOCK_ERRNO) { 268 logError("Reading control message, some work item will possibly " ~ 269 "never be executed: %s", socketErrnoString(errno)); 270 } 271 } 272 if (bytesRead != msg.sizeof) break; 273 274 // Everything went fine, we received a new control message. 275 final switch (msg.type) { 276 case MsgType.SHUTDOWN: 277 // The message was just intended to wake us up for shutdown. 278 break; 279 280 case MsgType.CANCEL: 281 // When processing a cancellation, we must not touch the first item, 282 // since it is already being processed. 283 auto queue = workQueues_[msg.transport]; 284 if (queue.length > 0) { 285 workQueues_[msg.transport] = [queue[0]] ~ 286 removeEqual(queue[1 .. $], msg.work); 287 } 288 break; 289 290 case MsgType.WORK: 291 // Now that the work item is back in the D world, we don't need the 292 // extra GC root for the context pointer anymore (see execute()). 293 GC.removeRoot(msg.work.ptr); 294 295 // Add the work item to the queue and execute it. 296 auto queue = msg.transport in workQueues_; 297 if (queue is null || (*queue).empty) { 298 // If the queue is empty, add the new work item to the queue as well, 299 // but immediately start executing it. 300 workQueues_[msg.transport] = [msg.work]; 301 executeWork(msg.transport, msg.work); 302 } else { 303 (*queue) ~= msg.work; 304 } 305 break; 306 } 307 } 308 309 // If the last read was successful, but didn't read enough bytes, we got 310 // a problem. 311 if (bytesRead > 0) { 312 logError("Unexpected partial control message read (%s byte(s) " ~ 313 "instead of %s), some work item will possibly never be executed.", 314 bytesRead, msg.sizeof); 315 } 316 } 317 318 /** 319 * Executes the given work item and all others enqueued for the same 320 * transport in a new fiber. Called from the worker thread. 321 */ 322 void executeWork(TAsyncTransport transport, Work work) { 323 (new Fiber({ 324 auto item = work; 325 while (true) { 326 try { 327 // Execute the actual work. It will possibly add listeners to the 328 // event loop and yield away if it has to wait for blocking 329 // operations. It is quite possible that another fiber will modify 330 // the work queue for the current transport. 331 item(); 332 } catch (Exception e) { 333 // This should never happen, just to be sure the worker thread 334 // doesn't stop working in mysterious ways because of an unhandled 335 // exception. 336 logError("Exception thrown by work item: %s", e); 337 } 338 339 // Remove the item from the work queue. 340 // Note: Due to the value semantics of array slices, we have to 341 // re-lookup this on every iteration. This could be solved, but I'd 342 // rather replace this directly with a queue type once one becomes 343 // available in Phobos. 344 auto queue = workQueues_[transport]; 345 assert(queue.front == item); 346 queue.popFront(); 347 workQueues_[transport] = queue; 348 349 // Now that the work item is done, no longer count it as queued. 350 decrementQueuedCount(); 351 352 if (queue.empty) break; 353 354 // If the queue is not empty, execute the next waiting item. 355 item = queue.front; 356 } 357 })).call(); 358 } 359 360 /** 361 * Increments the amount of queued items. 362 */ 363 void incrementQueuedCount() { 364 synchronized (queuedCountMutex_) { 365 ++queuedCount_; 366 } 367 } 368 369 /** 370 * Decrements the amount of queued items. 371 */ 372 void decrementQueuedCount() { 373 synchronized (queuedCountMutex_) { 374 assert(queuedCount_ > 0); 375 --queuedCount_; 376 if (queuedCount_ == 0) { 377 zeroQueuedCondition_.notifyAll(); 378 } 379 } 380 } 381 382 static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short, 383 void *managerThis 384 ) { 385 (cast(TLibeventAsyncManager)managerThis).receiveControlMsg(); 386 } 387 388 static extern(C) void socketCallback(evutil_socket_t, short flags, 389 void *arg 390 ) { 391 auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT : 392 TAsyncEventReason.NORMAL; 393 (*(cast(TSocketEventListener*)arg))(reason); 394 GC.removeRange(arg); 395 destroy(arg); 396 free(arg); 397 } 398 399 static extern(C) void delayCallback(evutil_socket_t, short flags, 400 void *arg 401 ) { 402 assert(flags & EV_TIMEOUT); 403 (*(cast(void delegate()*)arg))(); 404 GC.removeRange(arg); 405 destroy(arg); 406 free(arg); 407 } 408 409 Thread workerThread_; 410 411 event_base* eventBase_; 412 413 /// The socket used for receiving new work items in the event loop. Paired 414 /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are 415 /// ignored and can be used to wake up the worker thread. 416 Socket controlReceiveSocket_; 417 event* controlReceiveEvent_; 418 419 /// The socket used to send new work items to the event loop. It is 420 /// expected that work items can always be read at once from it, i.e. that 421 /// there will never be short reads. 422 Socket controlSendSocket_; 423 424 /// Queued up work delegates for async transports. This also includes 425 /// currently active ones, they are removed from the queue on completion, 426 /// which is relied on by the control message receive fiber (the main one) 427 /// to decide whether to immediately start executing items or not. 428 // TODO: This should really be of some queue type, not an array slice, but 429 // std.container doesn't have anything. 430 Work[][TAsyncTransport] workQueues_; 431 432 /// The total number of work items not yet finished (queued and currently 433 /// executed) and delays not yet executed. 434 uint queuedCount_; 435 436 /// Protects queuedCount_. 437 Mutex queuedCountMutex_; 438 439 /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_. 440 Condition zeroQueuedCondition_; 441 } 442 443 private { 444 timeval toTimeval(const(Duration) dur) { 445 timeval tv; 446 dur.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec); 447 return tv; 448 } 449 450 /** 451 * Returns the libevent flags combination to represent a given TAsyncEventType. 452 */ 453 short libeventEventType(TAsyncEventType type) { 454 final switch (type) { 455 case TAsyncEventType.READ: 456 return EV_READ | EV_ET; 457 case TAsyncEventType.WRITE: 458 return EV_WRITE | EV_ET; 459 } 460 } 461 }