1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.server.threaded; 20 21 import core.thread; 22 import std.variant : Variant; 23 import thrift.base; 24 import thrift.protocol.base; 25 import thrift.protocol.processor; 26 import thrift.server.base; 27 import thrift.server.transport.base; 28 import thrift.transport.base; 29 import thrift.util.cancellation; 30 31 /** 32 * A simple threaded server which spawns a new thread per connection. 33 */ 34 class TThreadedServer : TServer { 35 /// 36 this( 37 TProcessor processor, 38 TServerTransport serverTransport, 39 TTransportFactory transportFactory, 40 TProtocolFactory protocolFactory 41 ) { 42 super(processor, serverTransport, transportFactory, protocolFactory); 43 } 44 45 /// 46 this( 47 TProcessorFactory processorFactory, 48 TServerTransport serverTransport, 49 TTransportFactory transportFactory, 50 TProtocolFactory protocolFactory 51 ) { 52 super(processorFactory, serverTransport, transportFactory, protocolFactory); 53 } 54 55 /// 56 this( 57 TProcessor processor, 58 TServerTransport serverTransport, 59 TTransportFactory inputTransportFactory, 60 TTransportFactory outputTransportFactory, 61 TProtocolFactory inputProtocolFactory, 62 TProtocolFactory outputProtocolFactory 63 ) { 64 super(processor, serverTransport, inputTransportFactory, 65 outputTransportFactory, inputProtocolFactory, outputProtocolFactory); 66 } 67 68 /// 69 this( 70 TProcessorFactory processorFactory, 71 TServerTransport serverTransport, 72 TTransportFactory inputTransportFactory, 73 TTransportFactory outputTransportFactory, 74 TProtocolFactory inputProtocolFactory, 75 TProtocolFactory outputProtocolFactory 76 ) { 77 super(processorFactory, serverTransport, inputTransportFactory, 78 outputTransportFactory, inputProtocolFactory, outputProtocolFactory); 79 } 80 81 override void serve(TCancellation cancellation = null) { 82 try { 83 // Start the server listening 84 serverTransport_.listen(); 85 } catch (TTransportException ttx) { 86 logError("listen() failed: %s", ttx); 87 return; 88 } 89 90 if (eventHandler) eventHandler.preServe(); 91 92 auto workerThreads = new ThreadGroup(); 93 94 while (true) { 95 TTransport client; 96 TTransport inputTransport; 97 TTransport outputTransport; 98 TProtocol inputProtocol; 99 TProtocol outputProtocol; 100 101 try { 102 client = serverTransport_.accept(cancellation); 103 scope(failure) client.close(); 104 105 inputTransport = inputTransportFactory_.getTransport(client); 106 scope(failure) inputTransport.close(); 107 108 outputTransport = outputTransportFactory_.getTransport(client); 109 scope(failure) outputTransport.close(); 110 111 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); 112 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); 113 } catch (TCancelledException tce) { 114 break; 115 } catch (TTransportException ttx) { 116 logError("TServerTransport failed on accept: %s", ttx); 117 continue; 118 } catch (TException tx) { 119 logError("Caught TException on accept: %s", tx); 120 continue; 121 } 122 123 auto info = TConnectionInfo(inputProtocol, outputProtocol, client); 124 auto processor = processorFactory_.getProcessor(info); 125 auto worker = new WorkerThread(client, inputProtocol, outputProtocol, 126 processor, eventHandler); 127 workerThreads.add(worker); 128 worker.start(); 129 } 130 131 try { 132 serverTransport_.close(); 133 } catch (TServerTransportException e) { 134 logError("Server transport failed to close: %s", e); 135 } 136 workerThreads.joinAll(); 137 } 138 } 139 140 // The worker thread handling a client connection. 141 private class WorkerThread : Thread { 142 this(TTransport client, TProtocol inputProtocol, TProtocol outputProtocol, 143 TProcessor processor, TServerEventHandler eventHandler) 144 { 145 client_ = client; 146 inputProtocol_ = inputProtocol; 147 outputProtocol_ = outputProtocol; 148 processor_ = processor; 149 eventHandler_ = eventHandler; 150 151 super(&run); 152 } 153 154 void run() { 155 Variant connectionContext; 156 if (eventHandler_) { 157 connectionContext = 158 eventHandler_.createContext(inputProtocol_, outputProtocol_); 159 } 160 161 try { 162 while (true) { 163 if (eventHandler_) { 164 eventHandler_.preProcess(connectionContext, client_); 165 } 166 167 if (!processor_.process(inputProtocol_, outputProtocol_, 168 connectionContext) || !inputProtocol_.transport.peek() 169 ) { 170 // Something went fundamentlly wrong or there is nothing more to 171 // process, close the connection. 172 break; 173 } 174 } 175 } catch (TTransportException ttx) { 176 if (ttx.type() != TTransportException.Type.END_OF_FILE) { 177 logError("Client died unexpectedly: %s", ttx); 178 } 179 } catch (Exception e) { 180 logError("Uncaught exception: %s", e); 181 } 182 183 if (eventHandler_) { 184 eventHandler_.deleteContext(connectionContext, inputProtocol_, 185 outputProtocol_); 186 } 187 188 try { 189 inputProtocol_.transport.close(); 190 } catch (TTransportException ttx) { 191 logError("Input close failed: %s", ttx); 192 } 193 try { 194 outputProtocol_.transport.close(); 195 } catch (TTransportException ttx) { 196 logError("Output close failed: %s", ttx); 197 } 198 try { 199 client_.close(); 200 } catch (TTransportException ttx) { 201 logError("Client close failed: %s", ttx); 202 } 203 } 204 205 private: 206 TTransport client_; 207 TProtocol inputProtocol_; 208 TProtocol outputProtocol_; 209 TProcessor processor_; 210 TServerEventHandler eventHandler_; 211 } 212 213 unittest { 214 import thrift.internal.test.server; 215 testServeCancel!TThreadedServer(); 216 } 217