1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.server.taskpool; 20 21 import core.sync.condition; 22 import core.sync.mutex; 23 import std.exception : enforce; 24 import std.parallelism; 25 import std.variant : Variant; 26 import thrift.base; 27 import thrift.protocol.base; 28 import thrift.protocol.processor; 29 import thrift.server.base; 30 import thrift.server.transport.base; 31 import thrift.transport.base; 32 import thrift.util.cancellation; 33 34 /** 35 * A server which dispatches client requests to a std.parallelism TaskPool. 36 */ 37 class TTaskPoolServer : TServer { 38 /// 39 this( 40 TProcessor processor, 41 TServerTransport serverTransport, 42 TTransportFactory transportFactory, 43 TProtocolFactory protocolFactory, 44 TaskPool taskPool = null 45 ) { 46 this(processor, serverTransport, transportFactory, transportFactory, 47 protocolFactory, protocolFactory, taskPool); 48 } 49 50 /// 51 this( 52 TProcessorFactory processorFactory, 53 TServerTransport serverTransport, 54 TTransportFactory transportFactory, 55 TProtocolFactory protocolFactory, 56 TaskPool taskPool = null 57 ) { 58 this(processorFactory, serverTransport, transportFactory, transportFactory, 59 protocolFactory, protocolFactory, taskPool); 60 } 61 62 /// 63 this( 64 TProcessor processor, 65 TServerTransport serverTransport, 66 TTransportFactory inputTransportFactory, 67 TTransportFactory outputTransportFactory, 68 TProtocolFactory inputProtocolFactory, 69 TProtocolFactory outputProtocolFactory, 70 TaskPool taskPool = null 71 ) { 72 this(new TSingletonProcessorFactory(processor), serverTransport, 73 inputTransportFactory, outputTransportFactory, 74 inputProtocolFactory, outputProtocolFactory); 75 } 76 77 /// 78 this( 79 TProcessorFactory processorFactory, 80 TServerTransport serverTransport, 81 TTransportFactory inputTransportFactory, 82 TTransportFactory outputTransportFactory, 83 TProtocolFactory inputProtocolFactory, 84 TProtocolFactory outputProtocolFactory, 85 TaskPool taskPool = null 86 ) { 87 super(processorFactory, serverTransport, inputTransportFactory, 88 outputTransportFactory, inputProtocolFactory, outputProtocolFactory); 89 90 if (taskPool) { 91 this.taskPool = taskPool; 92 } else { 93 auto ptp = std.parallelism.taskPool; 94 if (ptp.size > 0) { 95 taskPool_ = ptp; 96 } else { 97 // If the global task pool is empty (default on a single-core machine), 98 // create a new one with a single worker thread. The rationale for this 99 // is to avoid that an application which worked fine with no task pool 100 // explicitly set on the multi-core developer boxes suddenly fails on a 101 // single-core user machine. 102 taskPool_ = new TaskPool(1); 103 taskPool_.isDaemon = true; 104 } 105 } 106 } 107 108 override void serve(TCancellation cancellation = null) { 109 serverTransport_.listen(); 110 111 if (eventHandler) eventHandler.preServe(); 112 113 auto queueState = QueueState(); 114 115 while (true) { 116 // Check if we can still handle more connections. 117 if (maxActiveConns) { 118 synchronized (queueState.mutex) { 119 while (queueState.activeConns >= maxActiveConns) { 120 queueState.connClosed.wait(); 121 } 122 } 123 } 124 125 TTransport client; 126 TTransport inputTransport; 127 TTransport outputTransport; 128 TProtocol inputProtocol; 129 TProtocol outputProtocol; 130 131 try { 132 client = serverTransport_.accept(cancellation); 133 scope(failure) client.close(); 134 135 inputTransport = inputTransportFactory_.getTransport(client); 136 scope(failure) inputTransport.close(); 137 138 outputTransport = outputTransportFactory_.getTransport(client); 139 scope(failure) outputTransport.close(); 140 141 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); 142 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); 143 } catch (TCancelledException tce) { 144 break; 145 } catch (TTransportException ttx) { 146 logError("TServerTransport failed on accept: %s", ttx); 147 continue; 148 } catch (TException tx) { 149 logError("Caught TException on accept: %s", tx); 150 continue; 151 } 152 153 auto info = TConnectionInfo(inputProtocol, outputProtocol, client); 154 auto processor = processorFactory_.getProcessor(info); 155 156 synchronized (queueState.mutex) { 157 ++queueState.activeConns; 158 } 159 taskPool_.put(task!worker(queueState, client, inputProtocol, 160 outputProtocol, processor, eventHandler)); 161 } 162 163 // First, stop accepting new connections. 164 try { 165 serverTransport_.close(); 166 } catch (TServerTransportException e) { 167 logError("Server transport failed to close: %s", e); 168 } 169 170 // Then, wait until all active connections are finished. 171 synchronized (queueState.mutex) { 172 while (queueState.activeConns > 0) { 173 queueState.connClosed.wait(); 174 } 175 } 176 } 177 178 /** 179 * Sets the task pool to use. 180 * 181 * By default, the global std.parallelism taskPool instance is used, which 182 * might not be appropriate for many applications, e.g. where tuning the 183 * number of worker threads is desired. (On single-core systems, a private 184 * task pool with a single thread is used by default, since the global 185 * taskPool instance has no worker threads then.) 186 * 187 * Note: TTaskPoolServer expects that tasks are never dropped from the pool, 188 * e.g. by calling TaskPool.close() while there are still tasks in the 189 * queue. If this happens, serve() will never return. 190 */ 191 void taskPool(TaskPool pool) @property { 192 enforce(pool !is null, "Cannot use a null task pool."); 193 enforce(pool.size > 0, "Cannot use a task pool with no worker threads."); 194 taskPool_ = pool; 195 } 196 197 /** 198 * The maximum number of client connections open at the same time. Zero for 199 * no limit, which is the default. 200 * 201 * If this limit is reached, no clients are accept()ed from the server 202 * transport any longer until another connection has been closed again. 203 */ 204 size_t maxActiveConns; 205 206 protected: 207 TaskPool taskPool_; 208 } 209 210 // Cannot be private as worker has to be passed as alias parameter to 211 // another module. 212 // private { 213 /* 214 * The state of the »connection queue«, i.e. used for keeping track of how 215 * many client connections are currently processed. 216 */ 217 struct QueueState { 218 /// Protects the queue state. 219 Mutex mutex; 220 221 /// The number of active connections (from the time they are accept()ed 222 /// until they are closed when the worked task finishes). 223 size_t activeConns; 224 225 /// Signals that the number of active connections has been decreased, i.e. 226 /// that a connection has been closed. 227 Condition connClosed; 228 229 /// Returns an initialized instance. 230 static QueueState opCall() { 231 QueueState q; 232 q.mutex = new Mutex; 233 q.connClosed = new Condition(q.mutex); 234 return q; 235 } 236 } 237 238 void worker(ref QueueState queueState, TTransport client, 239 TProtocol inputProtocol, TProtocol outputProtocol, 240 TProcessor processor, TServerEventHandler eventHandler) 241 { 242 scope (exit) { 243 synchronized (queueState.mutex) { 244 assert(queueState.activeConns > 0); 245 --queueState.activeConns; 246 queueState.connClosed.notifyAll(); 247 } 248 } 249 250 Variant connectionContext; 251 if (eventHandler) { 252 connectionContext = 253 eventHandler.createContext(inputProtocol, outputProtocol); 254 } 255 256 try { 257 while (true) { 258 if (eventHandler) { 259 eventHandler.preProcess(connectionContext, client); 260 } 261 262 if (!processor.process(inputProtocol, outputProtocol, 263 connectionContext) || !inputProtocol.transport.peek() 264 ) { 265 // Something went fundamentlly wrong or there is nothing more to 266 // process, close the connection. 267 break; 268 } 269 } 270 } catch (TTransportException ttx) { 271 if (ttx.type() != TTransportException.Type.END_OF_FILE) { 272 logError("Client died unexpectedly: %s", ttx); 273 } 274 } catch (Exception e) { 275 logError("Uncaught exception: %s", e); 276 } 277 278 if (eventHandler) { 279 eventHandler.deleteContext(connectionContext, inputProtocol, 280 outputProtocol); 281 } 282 283 try { 284 inputProtocol.transport.close(); 285 } catch (TTransportException ttx) { 286 logError("Input close failed: %s", ttx); 287 } 288 try { 289 outputProtocol.transport.close(); 290 } catch (TTransportException ttx) { 291 logError("Output close failed: %s", ttx); 292 } 293 try { 294 client.close(); 295 } catch (TTransportException ttx) { 296 logError("Client close failed: %s", ttx); 297 } 298 } 299 // } 300 301 unittest { 302 import thrift.internal.test.server; 303 testServeCancel!TTaskPoolServer(); 304 }