1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */19 modulethrift.server.taskpool;
20 21 importcore.sync.condition;
22 importcore.sync.mutex;
23 importstd.exception : enforce;
24 importstd.parallelism;
25 importstd.variant : Variant;
26 importthrift.base;
27 importthrift.protocol.base;
28 importthrift.protocol.processor;
29 importthrift.server.base;
30 importthrift.server.transport.base;
31 importthrift.transport.base;
32 importthrift.util.cancellation;
33 34 /**
35 * A server which dispatches client requests to a std.parallelism TaskPool.
36 */37 classTTaskPoolServer : TServer {
38 ///39 this(
40 TProcessorprocessor,
41 TServerTransportserverTransport,
42 TTransportFactorytransportFactory,
43 TProtocolFactoryprotocolFactory,
44 TaskPooltaskPool = null45 ) {
46 this(processor, serverTransport, transportFactory, transportFactory,
47 protocolFactory, protocolFactory, taskPool);
48 }
49 50 ///51 this(
52 TProcessorFactoryprocessorFactory,
53 TServerTransportserverTransport,
54 TTransportFactorytransportFactory,
55 TProtocolFactoryprotocolFactory,
56 TaskPooltaskPool = null57 ) {
58 this(processorFactory, serverTransport, transportFactory, transportFactory,
59 protocolFactory, protocolFactory, taskPool);
60 }
61 62 ///63 this(
64 TProcessorprocessor,
65 TServerTransportserverTransport,
66 TTransportFactoryinputTransportFactory,
67 TTransportFactoryoutputTransportFactory,
68 TProtocolFactoryinputProtocolFactory,
69 TProtocolFactoryoutputProtocolFactory,
70 TaskPooltaskPool = null71 ) {
72 this(newTSingletonProcessorFactory(processor), serverTransport,
73 inputTransportFactory, outputTransportFactory,
74 inputProtocolFactory, outputProtocolFactory);
75 }
76 77 ///78 this(
79 TProcessorFactoryprocessorFactory,
80 TServerTransportserverTransport,
81 TTransportFactoryinputTransportFactory,
82 TTransportFactoryoutputTransportFactory,
83 TProtocolFactoryinputProtocolFactory,
84 TProtocolFactoryoutputProtocolFactory,
85 TaskPooltaskPool = null86 ) {
87 super(processorFactory, serverTransport, inputTransportFactory,
88 outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
89 90 if (taskPool) {
91 this.taskPool = taskPool;
92 } else {
93 autoptp = std.parallelism.taskPool;
94 if (ptp.size > 0) {
95 taskPool_ = ptp;
96 } else {
97 // If the global task pool is empty (default on a single-core machine),98 // create a new one with a single worker thread. The rationale for this99 // is to avoid that an application which worked fine with no task pool100 // explicitly set on the multi-core developer boxes suddenly fails on a101 // single-core user machine.102 taskPool_ = newTaskPool(1);
103 taskPool_.isDaemon = true;
104 }
105 }
106 }
107 108 overridevoidserve(TCancellationcancellation = null) {
109 serverTransport_.listen();
110 111 if (eventHandler) eventHandler.preServe();
112 113 autoqueueState = QueueState();
114 115 while (true) {
116 // Check if we can still handle more connections.117 if (maxActiveConns) {
118 synchronized (queueState.mutex) {
119 while (queueState.activeConns >= maxActiveConns) {
120 queueState.connClosed.wait();
121 }
122 }
123 }
124 125 TTransportclient;
126 TTransportinputTransport;
127 TTransportoutputTransport;
128 TProtocolinputProtocol;
129 TProtocoloutputProtocol;
130 131 try {
132 client = serverTransport_.accept(cancellation);
133 scope(failure) client.close();
134 135 inputTransport = inputTransportFactory_.getTransport(client);
136 scope(failure) inputTransport.close();
137 138 outputTransport = outputTransportFactory_.getTransport(client);
139 scope(failure) outputTransport.close();
140 141 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
142 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
143 } catch (TCancelledExceptiontce) {
144 break;
145 } catch (TTransportExceptionttx) {
146 logError("TServerTransport failed on accept: %s", ttx);
147 continue;
148 } catch (TExceptiontx) {
149 logError("Caught TException on accept: %s", tx);
150 continue;
151 }
152 153 autoinfo = TConnectionInfo(inputProtocol, outputProtocol, client);
154 autoprocessor = processorFactory_.getProcessor(info);
155 156 synchronized (queueState.mutex) {
157 ++queueState.activeConns;
158 }
159 taskPool_.put(task!worker(queueState, client, inputProtocol,
160 outputProtocol, processor, eventHandler));
161 }
162 163 // First, stop accepting new connections.164 try {
165 serverTransport_.close();
166 } catch (TServerTransportExceptione) {
167 logError("Server transport failed to close: %s", e);
168 }
169 170 // Then, wait until all active connections are finished.171 synchronized (queueState.mutex) {
172 while (queueState.activeConns > 0) {
173 queueState.connClosed.wait();
174 }
175 }
176 }
177 178 /**
179 * Sets the task pool to use.
180 *
181 * By default, the global std.parallelism taskPool instance is used, which
182 * might not be appropriate for many applications, e.g. where tuning the
183 * number of worker threads is desired. (On single-core systems, a private
184 * task pool with a single thread is used by default, since the global
185 * taskPool instance has no worker threads then.)
186 *
187 * Note: TTaskPoolServer expects that tasks are never dropped from the pool,
188 * e.g. by calling TaskPool.close() while there are still tasks in the
189 * queue. If this happens, serve() will never return.
190 */191 voidtaskPool(TaskPoolpool) @property {
192 enforce(pool !isnull, "Cannot use a null task pool.");
193 enforce(pool.size > 0, "Cannot use a task pool with no worker threads.");
194 taskPool_ = pool;
195 }
196 197 /**
198 * The maximum number of client connections open at the same time. Zero for
199 * no limit, which is the default.
200 *
201 * If this limit is reached, no clients are accept()ed from the server
202 * transport any longer until another connection has been closed again.
203 */204 size_tmaxActiveConns;
205 206 protected:
207 TaskPooltaskPool_;
208 }
209 210 // Cannot be private as worker has to be passed as alias parameter to211 // another module.212 // private {213 /*
214 * The state of the »connection queue«, i.e. used for keeping track of how
215 * many client connections are currently processed.
216 */217 structQueueState {
218 /// Protects the queue state.219 Mutexmutex;
220 221 /// The number of active connections (from the time they are accept()ed222 /// until they are closed when the worked task finishes).223 size_tactiveConns;
224 225 /// Signals that the number of active connections has been decreased, i.e.226 /// that a connection has been closed.227 ConditionconnClosed;
228 229 /// Returns an initialized instance.230 staticQueueStateopCall() {
231 QueueStateq;
232 q.mutex = newMutex;
233 q.connClosed = newCondition(q.mutex);
234 returnq;
235 }
236 }
237 238 voidworker(refQueueStatequeueState, TTransportclient,
239 TProtocolinputProtocol, TProtocoloutputProtocol,
240 TProcessorprocessor, TServerEventHandlereventHandler)
241 {
242 scope (exit) {
243 synchronized (queueState.mutex) {
244 assert(queueState.activeConns > 0);
245 --queueState.activeConns;
246 queueState.connClosed.notifyAll();
247 }
248 }
249 250 VariantconnectionContext;
251 if (eventHandler) {
252 connectionContext =
253 eventHandler.createContext(inputProtocol, outputProtocol);
254 }
255 256 try {
257 while (true) {
258 if (eventHandler) {
259 eventHandler.preProcess(connectionContext, client);
260 }
261 262 if (!processor.process(inputProtocol, outputProtocol,
263 connectionContext) || !inputProtocol.transport.peek()
264 ) {
265 // Something went fundamentlly wrong or there is nothing more to266 // process, close the connection.267 break;
268 }
269 }
270 } catch (TTransportExceptionttx) {
271 if (ttx.type() != TTransportException.Type.END_OF_FILE) {
272 logError("Client died unexpectedly: %s", ttx);
273 }
274 } catch (Exceptione) {
275 logError("Uncaught exception: %s", e);
276 }
277 278 if (eventHandler) {
279 eventHandler.deleteContext(connectionContext, inputProtocol,
280 outputProtocol);
281 }
282 283 try {
284 inputProtocol.transport.close();
285 } catch (TTransportExceptionttx) {
286 logError("Input close failed: %s", ttx);
287 }
288 try {
289 outputProtocol.transport.close();
290 } catch (TTransportExceptionttx) {
291 logError("Output close failed: %s", ttx);
292 }
293 try {
294 client.close();
295 } catch (TTransportExceptionttx) {
296 logError("Client close failed: %s", ttx);
297 }
298 }
299 // }300 301 unittest {
302 importthrift.internal.test.server;
303 testServeCancel!TTaskPoolServer();
304 }