1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.util.awaitable; 20 21 import core.sync.condition; 22 import core.sync.mutex; 23 import core.time : Duration; 24 import std.exception : enforce; 25 import std.socket/+ : Socket, socketPair+/; // DMD @@BUG314@@ 26 import thrift.base; 27 28 // To avoid DMD @@BUG6395@@. 29 import thrift.internal.algorithm; 30 31 /** 32 * An event that can occur at some point in the future and which can be 33 * awaited, either by blocking until it occurs, or by registering a callback 34 * delegate. 35 */ 36 interface TAwaitable { 37 /** 38 * Waits until the event occurs. 39 * 40 * Calling wait() for an event that has already occurred is a no-op. 41 */ 42 void wait(); 43 44 /** 45 * Waits until the event occurs or the specified timeout expires. 46 * 47 * Calling wait() for an event that has already occurred is a no-op. 48 * 49 * Returns: Whether the event was triggered before the timeout expired. 50 */ 51 bool wait(Duration timeout); 52 53 /** 54 * Registers a callback that is called if the event occurs. 55 * 56 * The delegate will likely be invoked from a different thread, and is 57 * expected not to perform expensive work as it will usually be invoked 58 * synchronously by the notifying thread. The order in which registered 59 * callbacks are invoked is not specified. 60 * 61 * The callback must never throw, but nothrow semantics are difficult to 62 * enforce, so currently exceptions are just swallowed by 63 * TAwaitable implementations. 64 * 65 * If the event has already occurred, the delegate is immediately executed 66 * in the current thread. 67 */ 68 void addCallback(void delegate() dg); 69 70 /** 71 * Removes a previously added callback. 72 * 73 * Returns: Whether the callback could be found in the list, i.e. whether it 74 * was previously added. 75 */ 76 bool removeCallback(void delegate() dg); 77 } 78 79 /** 80 * A simple TAwaitable event triggered by just calling a trigger() method. 81 */ 82 class TOneshotEvent : TAwaitable { 83 this() { 84 mutex_ = new Mutex; 85 condition_ = new Condition(mutex_); 86 } 87 88 override void wait() { 89 synchronized (mutex_) { 90 while (!triggered_) condition_.wait(); 91 } 92 } 93 94 override bool wait(Duration timeout) { 95 synchronized (mutex_) { 96 if (triggered_) return true; 97 condition_.wait(timeout); 98 return triggered_; 99 } 100 } 101 102 override void addCallback(void delegate() dg) { 103 mutex_.lock(); 104 scope (failure) mutex_.unlock(); 105 106 callbacks_ ~= dg; 107 108 if (triggered_) { 109 mutex_.unlock(); 110 dg(); 111 return; 112 } 113 114 mutex_.unlock(); 115 } 116 117 override bool removeCallback(void delegate() dg) { 118 synchronized (mutex_) { 119 auto oldLength = callbacks_.length; 120 callbacks_ = removeEqual(callbacks_, dg); 121 return callbacks_.length < oldLength; 122 } 123 } 124 125 /** 126 * Triggers the event. 127 * 128 * Any registered event callbacks are executed synchronously before the 129 * function returns. 130 */ 131 void trigger() { 132 synchronized (mutex_) { 133 if (!triggered_) { 134 triggered_ = true; 135 condition_.notifyAll(); 136 foreach (c; callbacks_) c(); 137 } 138 } 139 } 140 141 private: 142 bool triggered_; 143 Mutex mutex_; 144 Condition condition_; 145 void delegate()[] callbacks_; 146 } 147 148 /** 149 * Translates TAwaitable events into dummy messages on a socket that can be 150 * used e.g. to wake up from a select() call. 151 */ 152 final class TSocketNotifier { 153 this() { 154 auto socks = socketPair(); 155 foreach (s; socks) s.blocking = false; 156 sendSocket_ = socks[0]; 157 recvSocket_ = socks[1]; 158 } 159 160 /** 161 * The socket the messages will be sent to. 162 */ 163 Socket socket() @property { 164 return recvSocket_; 165 } 166 167 /** 168 * Atatches the socket notifier to the specified awaitable, causing it to 169 * write a byte to the notification socket when the awaitable callbacks are 170 * invoked. 171 * 172 * If the event has already been triggered, the dummy byte is written 173 * immediately to the socket. 174 * 175 * A socket notifier can only be attached to a single awaitable at a time. 176 * 177 * Throws: TException if the socket notifier is already attached. 178 */ 179 void attach(TAwaitable awaitable) { 180 enforce(!awaitable_, new TException("Already attached.")); 181 awaitable.addCallback(¬ify); 182 awaitable_ = awaitable; 183 } 184 185 /** 186 * Detaches the socket notifier from the awaitable it is currently attached 187 * to. 188 * 189 * Throws: TException if the socket notifier is not currently attached. 190 */ 191 void detach() { 192 enforce(awaitable_, new TException("Not attached.")); 193 194 // Soak up any not currently read notification bytes. 195 ubyte[1] dummy = void; 196 while (recvSocket_.receive(dummy) != Socket.ERROR) {} 197 198 auto couldRemove = awaitable_.removeCallback(¬ify); 199 assert(couldRemove); 200 awaitable_ = null; 201 } 202 203 private: 204 void notify() { 205 ubyte[1] zero; 206 sendSocket_.send(zero); 207 } 208 209 TAwaitable awaitable_; 210 Socket sendSocket_; 211 Socket recvSocket_; 212 }