1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.server.threaded;
20 
21 import core.thread;
22 import std.variant : Variant;
23 import thrift.base;
24 import thrift.protocol.base;
25 import thrift.protocol.processor;
26 import thrift.server.base;
27 import thrift.server.transport.base;
28 import thrift.transport.base;
29 import thrift.util.cancellation;
30 
31 /**
32  * A simple threaded server which spawns a new thread per connection.
33  */
34 class TThreadedServer : TServer {
35   ///
36   this(
37     TProcessor processor,
38     TServerTransport serverTransport,
39     TTransportFactory transportFactory,
40     TProtocolFactory protocolFactory
41   ) {
42     super(processor, serverTransport, transportFactory, protocolFactory);
43   }
44 
45   ///
46   this(
47     TProcessorFactory processorFactory,
48     TServerTransport serverTransport,
49     TTransportFactory transportFactory,
50     TProtocolFactory protocolFactory
51   ) {
52     super(processorFactory, serverTransport, transportFactory, protocolFactory);
53   }
54 
55   ///
56   this(
57     TProcessor processor,
58     TServerTransport serverTransport,
59     TTransportFactory inputTransportFactory,
60     TTransportFactory outputTransportFactory,
61     TProtocolFactory inputProtocolFactory,
62     TProtocolFactory outputProtocolFactory
63   ) {
64     super(processor, serverTransport, inputTransportFactory,
65       outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
66   }
67 
68   ///
69   this(
70     TProcessorFactory processorFactory,
71     TServerTransport serverTransport,
72     TTransportFactory inputTransportFactory,
73     TTransportFactory outputTransportFactory,
74     TProtocolFactory inputProtocolFactory,
75     TProtocolFactory outputProtocolFactory
76   ) {
77     super(processorFactory, serverTransport, inputTransportFactory,
78       outputTransportFactory, inputProtocolFactory, outputProtocolFactory);
79   }
80 
81   override void serve(TCancellation cancellation = null) {
82     try {
83       // Start the server listening
84       serverTransport_.listen();
85     } catch (TTransportException ttx) {
86       logError("listen() failed: %s", ttx);
87       return;
88     }
89 
90     if (eventHandler) eventHandler.preServe();
91 
92     auto workerThreads = new ThreadGroup();
93 
94     while (true) {
95       TTransport client;
96       TTransport inputTransport;
97       TTransport outputTransport;
98       TProtocol inputProtocol;
99       TProtocol outputProtocol;
100 
101       try {
102         client = serverTransport_.accept(cancellation);
103         scope(failure) client.close();
104 
105         inputTransport = inputTransportFactory_.getTransport(client);
106         scope(failure) inputTransport.close();
107 
108         outputTransport = outputTransportFactory_.getTransport(client);
109         scope(failure) outputTransport.close();
110 
111         inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
112         outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
113       } catch (TCancelledException tce) {
114         break;
115       } catch (TTransportException ttx) {
116         logError("TServerTransport failed on accept: %s", ttx);
117         continue;
118       } catch (TException tx) {
119         logError("Caught TException on accept: %s", tx);
120         continue;
121       }
122 
123       auto info = TConnectionInfo(inputProtocol, outputProtocol, client);
124       auto processor = processorFactory_.getProcessor(info);
125       auto worker = new WorkerThread(client, inputProtocol, outputProtocol,
126         processor, eventHandler);
127       workerThreads.add(worker);
128       worker.start();
129     }
130 
131     try {
132       serverTransport_.close();
133     } catch (TServerTransportException e) {
134       logError("Server transport failed to close: %s", e);
135     }
136     workerThreads.joinAll();
137   }
138 }
139 
140 // The worker thread handling a client connection.
141 private class WorkerThread : Thread {
142   this(TTransport client, TProtocol inputProtocol, TProtocol outputProtocol,
143     TProcessor processor, TServerEventHandler eventHandler)
144   {
145     client_ = client;
146     inputProtocol_ = inputProtocol;
147     outputProtocol_ = outputProtocol;
148     processor_ = processor;
149     eventHandler_ = eventHandler;
150 
151     super(&run);
152   }
153 
154   void run() {
155     Variant connectionContext;
156     if (eventHandler_) {
157       connectionContext =
158         eventHandler_.createContext(inputProtocol_, outputProtocol_);
159     }
160 
161     try {
162       while (true) {
163         if (eventHandler_) {
164           eventHandler_.preProcess(connectionContext, client_);
165         }
166 
167         if (!processor_.process(inputProtocol_, outputProtocol_,
168           connectionContext) || !inputProtocol_.transport.peek()
169         ) {
170           // Something went fundamentlly wrong or there is nothing more to
171           // process, close the connection.
172           break;
173         }
174       }
175     } catch (TTransportException ttx) {
176       if (ttx.type() != TTransportException.Type.END_OF_FILE) {
177         logError("Client died unexpectedly: %s", ttx);
178       }
179     } catch (Exception e) {
180       logError("Uncaught exception: %s", e);
181     }
182 
183     if (eventHandler_) {
184       eventHandler_.deleteContext(connectionContext, inputProtocol_,
185         outputProtocol_);
186     }
187 
188     try {
189       inputProtocol_.transport.close();
190     } catch (TTransportException ttx) {
191       logError("Input close failed: %s", ttx);
192     }
193     try {
194       outputProtocol_.transport.close();
195     } catch (TTransportException ttx) {
196       logError("Output close failed: %s", ttx);
197     }
198     try {
199       client_.close();
200     } catch (TTransportException ttx) {
201       logError("Client close failed: %s", ttx);
202     }
203   }
204 
205 private:
206   TTransport client_;
207   TProtocol inputProtocol_;
208   TProtocol outputProtocol_;
209   TProcessor processor_;
210   TServerEventHandler eventHandler_;
211 }
212 
213 unittest {
214   import thrift.internal.test.server;
215   testServeCancel!TThreadedServer();
216 }
217