1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.server.taskpool;
20 
21 import core.sync.condition;
22 import core.sync.mutex;
23 import std.exception : enforce;
24 import std.parallelism;
25 import std.variant : Variant;
26 import thrift.base;
27 import thrift.protocol.base;
28 import thrift.protocol.processor;
29 import thrift.server.base;
30 import thrift.server.transport.base;
31 import thrift.transport.base;
32 import thrift.util.cancellation;
33 
34 /**
35  * A server which dispatches client requests to a std.parallelism TaskPool.
36  */
37 class TTaskPoolServer : TServer {
38   ///
39   this(
40     TProcessor processor,
41     TServerTransport serverTransport,
42     TTransportFactory transportFactory,
43     TProtocolFactory protocolFactory,
44     TaskPool taskPool = null
45   ) {
46     this(processor, serverTransport, transportFactory, transportFactory,
47       protocolFactory, protocolFactory, taskPool);
48   }
49 
50   ///
51   this(
52     TProcessorFactory processorFactory,
53     TServerTransport serverTransport,
54     TTransportFactory transportFactory,
55     TProtocolFactory protocolFactory,
56     TaskPool taskPool = null
57   ) {
58     this(processorFactory, serverTransport, transportFactory, transportFactory,
59       protocolFactory, protocolFactory, taskPool);
60   }
61 
62   ///
63   this(
64     TProcessor processor,
65     TServerTransport serverTransport,
66     TTransportFactory inputTransportFactory,
67     TTransportFactory outputTransportFactory,
68     TProtocolFactory inputProtocolFactory,
69     TProtocolFactory outputProtocolFactory,
70     TaskPool taskPool = null
71   ) {
72     this(new TSingletonProcessorFactory(processor), serverTransport,
73       inputTransportFactory, outputTransportFactory,
74       inputProtocolFactory, outputProtocolFactory);
75   }
76 
77   ///
78   this(
79     TProcessorFactory processorFactory,
80     TServerTransport serverTransport,
81     TTransportFactory inputTransportFactory,
82     TTransportFactory outputTransportFactory,
83     TProtocolFactory inputProtocolFactory,
84     TProtocolFactory outputProtocolFactory,
85     TaskPool taskPool = null
86   ) {
87     super(processorFactory, serverTransport, inputTransportFactory,
88       outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
89 
90     if (taskPool) {
91       this.taskPool = taskPool;
92     } else {
93       auto ptp = std.parallelism.taskPool;
94       if (ptp.size > 0) {
95         taskPool_ = ptp;
96       } else {
97         // If the global task pool is empty (default on a single-core machine),
98         // create a new one with a single worker thread. The rationale for this
99         // is to avoid that an application which worked fine with no task pool
100         // explicitly set on the multi-core developer boxes suddenly fails on a
101         // single-core user machine.
102         taskPool_ = new TaskPool(1);
103         taskPool_.isDaemon = true;
104       }
105     }
106   }
107 
108   override void serve(TCancellation cancellation = null) {
109     serverTransport_.listen();
110 
111     if (eventHandler) eventHandler.preServe();
112 
113     auto queueState = QueueState();
114 
115     while (true) {
116       // Check if we can still handle more connections.
117       if (maxActiveConns) {
118         synchronized (queueState.mutex) {
119           while (queueState.activeConns >= maxActiveConns) {
120             queueState.connClosed.wait();
121           }
122         }
123       }
124 
125       TTransport client;
126       TTransport inputTransport;
127       TTransport outputTransport;
128       TProtocol inputProtocol;
129       TProtocol outputProtocol;
130 
131       try {
132         client = serverTransport_.accept(cancellation);
133         scope(failure) client.close();
134 
135         inputTransport = inputTransportFactory_.getTransport(client);
136         scope(failure) inputTransport.close();
137 
138         outputTransport = outputTransportFactory_.getTransport(client);
139         scope(failure) outputTransport.close();
140 
141         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
142         outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
143       } catch (TCancelledException tce) {
144         break;
145       } catch (TTransportException ttx) {
146         logError("TServerTransport failed on accept: %s", ttx);
147         continue;
148       } catch (TException tx) {
149         logError("Caught TException on accept: %s", tx);
150         continue;
151       }
152 
153       auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
154       auto processor = processorFactory_.getProcessor(info);
155 
156       synchronized (queueState.mutex) {
157         ++queueState.activeConns;
158       }
159       taskPool_.put(task!worker(queueState, client, inputProtocol,
160         outputProtocol, processor, eventHandler));
161     }
162 
163     // First, stop accepting new connections.
164     try {
165       serverTransport_.close();
166     } catch (TServerTransportException e) {
167       logError("Server transport failed to close: %s", e);
168     }
169 
170     // Then, wait until all active connections are finished.
171     synchronized (queueState.mutex) {
172       while (queueState.activeConns > 0) {
173         queueState.connClosed.wait();
174       }
175     }
176   }
177 
178   /**
179    * Sets the task pool to use.
180    *
181    * By default, the global std.parallelism taskPool instance is used, which
182    * might not be appropriate for many applications, e.g. where tuning the
183    * number of worker threads is desired. (On single-core systems, a private
184    * task pool with a single thread is used by default, since the global
185    * taskPool instance has no worker threads then.)
186    *
187    * Note: TTaskPoolServer expects that tasks are never dropped from the pool,
188    * e.g. by calling TaskPool.close() while there are still tasks in the
189    * queue. If this happens, serve() will never return.
190    */
191   void taskPool(TaskPool pool) @property {
192     enforce(pool !is null, "Cannot use a null task pool.");
193     enforce(pool.size > 0, "Cannot use a task pool with no worker threads.");
194     taskPool_ = pool;
195   }
196 
197   /**
198    * The maximum number of client connections open at the same time. Zero for
199    * no limit, which is the default.
200    *
201    * If this limit is reached, no clients are accept()ed from the server
202    * transport any longer until another connection has been closed again.
203    */
204   size_t maxActiveConns;
205 
206 protected:
207   TaskPool taskPool_;
208 }
209 
210 // Cannot be private as worker has to be passed as alias parameter to
211 // another module.
212 // private {
213   /*
214    * The state of the »connection queue«, i.e. used for keeping track of how
215    * many client connections are currently processed.
216    */
217   struct QueueState {
218     /// Protects the queue state.
219     Mutex mutex;
220 
221     /// The number of active connections (from the time they are accept()ed
222     /// until they are closed when the worked task finishes).
223     size_t activeConns;
224 
225     /// Signals that the number of active connections has been decreased, i.e.
226     /// that a connection has been closed.
227     Condition connClosed;
228 
229     /// Returns an initialized instance.
230     static QueueState opCall() {
231       QueueState q;
232       q.mutex = new Mutex;
233       q.connClosed = new Condition(q.mutex);
234       return q;
235     }
236   }
237 
238   void worker(ref QueueState queueState, TTransport client,
239     TProtocol inputProtocol, TProtocol outputProtocol,
240     TProcessor processor, TServerEventHandler eventHandler)
241   {
242     scope (exit) {
243       synchronized (queueState.mutex) {
244         assert(queueState.activeConns > 0);
245         --queueState.activeConns;
246         queueState.connClosed.notifyAll();
247       }
248     }
249 
250     Variant connectionContext;
251     if (eventHandler) {
252       connectionContext =
253         eventHandler.createContext(inputProtocol, outputProtocol);
254     }
255 
256     try {
257       while (true) {
258         if (eventHandler) {
259           eventHandler.preProcess(connectionContext, client);
260         }
261 
262         if (!processor.process(inputProtocol, outputProtocol,
263           connectionContext) || !inputProtocol.transport.peek()
264         ) {
265           // Something went fundamentlly wrong or there is nothing more to
266           // process, close the connection.
267           break;
268         }
269       }
270     } catch (TTransportException ttx) {
271       if (ttx.type() != TTransportException.Type.END_OF_FILE) {
272         logError("Client died unexpectedly: %s", ttx);
273       }
274     } catch (Exception e) {
275       logError("Uncaught exception: %s", e);
276     }
277 
278     if (eventHandler) {
279       eventHandler.deleteContext(connectionContext, inputProtocol,
280         outputProtocol);
281     }
282 
283     try {
284       inputProtocol.transport.close();
285     } catch (TTransportException ttx) {
286       logError("Input close failed: %s", ttx);
287     }
288     try {
289       outputProtocol.transport.close();
290     } catch (TTransportException ttx) {
291       logError("Output close failed: %s", ttx);
292     }
293     try {
294       client.close();
295     } catch (TTransportException ttx) {
296       logError("Client close failed: %s", ttx);
297     }
298   }
299 // }
300 
301 unittest {
302   import thrift.internal.test.server;
303   testServeCancel!TTaskPoolServer();
304 }