1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.codegen.async_client; 20 21 import std.conv : text, to; 22 import std.traits : ParameterStorageClass, ParameterStorageClassTuple, 23 ParameterTypeTuple, ReturnType; 24 import thrift.base; 25 import thrift.async.base; 26 import thrift.codegen.base; 27 import thrift.codegen.client; 28 import thrift.internal.codegen; 29 import thrift.internal.ctfe; 30 import thrift.protocol.base; 31 import thrift.transport.base; 32 import thrift.util.cancellation; 33 import thrift.util.future; 34 35 /** 36 * Asynchronous Thrift service client which returns the results as TFutures an 37 * uses a TAsyncManager to perform the actual work. 38 * 39 * TAsyncClientBase serves as a supertype for all TAsyncClients for the same 40 * service, which might be instantiated with different concrete protocol types 41 * (there is no covariance for template type parameters), and extends 42 * TFutureInterface!Interface. If Interface is derived from another service 43 * BaseInterface, it also extends TAsyncClientBase!BaseInterface. 44 * 45 * TAsyncClient implements TAsyncClientBase and offers two constructors with 46 * the following signatures: 47 * --- 48 * this(TAsyncTransport trans, TTransportFactory tf, TProtocolFactory pf); 49 * this(TAsyncTransport trans, TTransportFactory itf, TTransportFactory otf, 50 * TProtocolFactory ipf, TProtocolFactory opf); 51 * --- 52 * 53 * Again, if Interface represents a derived Thrift service, 54 * TAsyncClient!Interface is also derived from TAsyncClient!BaseInterface. 55 * 56 * TAsyncClient can exclusively be used with TAsyncTransports, as it needs to 57 * access the associated TAsyncManager. To set up any wrapper transports 58 * (e.g. buffered, framed) on top of it and to instanciate the protocols to use, 59 * TTransportFactory and TProtocolFactory instances are passed to the 60 * constructors – the three argument constructor is a shortcut if the same 61 * transport and protocol are to be used for both input and output, which is 62 * the most common case. 63 * 64 * If the same transport factory is passed for both input and output transports, 65 * only a single wrapper transport will be created and used for both directions. 66 * This allows easy implementation of protocols like SSL. 67 * 68 * Just as TClient does, TAsyncClient also takes two optional template 69 * arguments which can be used for specifying the actual TProtocol 70 * implementation used for optimization purposes, as virtual calls can 71 * completely be eliminated then. If the actual types of the protocols 72 * instantiated by the factories used does not match the ones statically 73 * specified in the template parameters, a TException is thrown during 74 * construction. 75 * 76 * Example: 77 * --- 78 * // A simple Thrift service. 79 * interface Foo { int foo(); } 80 * 81 * // Create a TAsyncSocketManager – thrift.async.libevent is used for this 82 * // example. 83 * auto manager = new TLibeventAsyncManager; 84 * 85 * // Set up an async transport to use. 86 * auto socket = new TAsyncSocket(manager, host, port); 87 * 88 * // Create a client instance. 89 * auto client = new TAsyncClient!Foo( 90 * socket, 91 * new TBufferedTransportFactory, // Wrap the socket in a TBufferedTransport. 92 * new TBinaryProtocolFactory!() // Use the Binary protocol. 93 * ); 94 * 95 * // Call foo and use the returned future. 96 * auto result = client.foo(); 97 * pragma(msg, typeof(result)); // TFuture!int 98 * int resultValue = result.waitGet(); // Waits until the result is available. 99 * --- 100 */ 101 interface TAsyncClientBase(Interface) if (isBaseService!Interface) : 102 TFutureInterface!Interface 103 { 104 /** 105 * The underlying TAsyncTransport used by this client instance. 106 */ 107 TAsyncTransport transport() @property; 108 } 109 110 /// Ditto 111 interface TAsyncClientBase(Interface) if (isDerivedService!Interface) : 112 TAsyncClientBase!(BaseService!Interface), TFutureInterface!Interface 113 {} 114 115 /// Ditto 116 template TAsyncClient(Interface, InputProtocol = TProtocol, OutputProtocol = void) if ( 117 isService!Interface && isTProtocol!InputProtocol && 118 (isTProtocol!OutputProtocol || is(OutputProtocol == void)) 119 ) { 120 mixin({ 121 static if (isDerivedService!Interface) { 122 string code = "class TAsyncClient : " ~ 123 "TAsyncClient!(BaseService!Interface, InputProtocol, OutputProtocol), " ~ 124 "TAsyncClientBase!Interface {\n"; 125 code ~= q{ 126 this(TAsyncTransport trans, TTransportFactory tf, TProtocolFactory pf) { 127 this(trans, tf, tf, pf, pf); 128 } 129 130 this(TAsyncTransport trans, TTransportFactory itf, 131 TTransportFactory otf, TProtocolFactory ipf, TProtocolFactory opf 132 ) { 133 super(trans, itf, otf, ipf, opf); 134 client_ = new typeof(client_)(iprot_, oprot_); 135 } 136 137 private TClient!(Interface, IProt, OProt) client_; 138 }; 139 } else { 140 string code = "class TAsyncClient : TAsyncClientBase!Interface {"; 141 code ~= q{ 142 alias InputProtocol IProt; 143 static if (isTProtocol!OutputProtocol) { 144 alias OutputProtocol OProt; 145 } else { 146 static assert(is(OutputProtocol == void)); 147 alias InputProtocol OProt; 148 } 149 150 this(TAsyncTransport trans, TTransportFactory tf, TProtocolFactory pf) { 151 this(trans, tf, tf, pf, pf); 152 } 153 154 this(TAsyncTransport trans, TTransportFactory itf, 155 TTransportFactory otf, TProtocolFactory ipf, TProtocolFactory opf 156 ) { 157 import std.exception; 158 transport_ = trans; 159 160 auto ip = itf.getTransport(trans); 161 TTransport op = void; 162 if (itf == otf) { 163 op = ip; 164 } else { 165 op = otf.getTransport(trans); 166 } 167 168 auto iprot = ipf.getProtocol(ip); 169 iprot_ = cast(IProt)iprot; 170 enforce(iprot_, new TException(text("Input protocol not of the " ~ 171 "specified concrete type (", IProt.stringof, ")."))); 172 173 auto oprot = opf.getProtocol(op); 174 oprot_ = cast(OProt)oprot; 175 enforce(oprot_, new TException(text("Output protocol not of the " ~ 176 "specified concrete type (", OProt.stringof, ")."))); 177 178 client_ = new typeof(client_)(iprot_, oprot_); 179 } 180 181 override TAsyncTransport transport() @property { 182 return transport_; 183 } 184 185 protected TAsyncTransport transport_; 186 protected IProt iprot_; 187 protected OProt oprot_; 188 private TClient!(Interface, IProt, OProt) client_; 189 }; 190 } 191 192 foreach (methodName; 193 FilterMethodNames!(Interface, __traits(derivedMembers, Interface)) 194 ) { 195 string[] paramList; 196 string[] paramNames; 197 foreach (i, _; ParameterTypeTuple!(mixin("Interface." ~ methodName))) { 198 immutable paramName = "param" ~ to!string(i + 1); 199 immutable storage = ParameterStorageClassTuple!( 200 mixin("Interface." ~ methodName))[i]; 201 202 paramList ~= ((storage & ParameterStorageClass.ref_) ? "ref " : "") ~ 203 "ParameterTypeTuple!(Interface." ~ methodName ~ ")[" ~ 204 to!string(i) ~ "] " ~ paramName; 205 paramNames ~= paramName; 206 } 207 paramList ~= "TCancellation cancellation = null"; 208 209 immutable returnTypeCode = "ReturnType!(Interface." ~ methodName ~ ")"; 210 code ~= "TFuture!(" ~ returnTypeCode ~ ") " ~ methodName ~ "(" ~ 211 ctfeJoin(paramList) ~ ") {\n"; 212 213 // Create the future instance that will repesent the result. 214 code ~= "auto promise = new TPromise!(" ~ returnTypeCode ~ ");\n"; 215 216 // Prepare delegate which executes the TClient method call. 217 code ~= "auto work = {\n"; 218 code ~= "try {\n"; 219 code ~= "static if (is(ReturnType!(Interface." ~ methodName ~ 220 ") == void)) {\n"; 221 code ~= "client_." ~ methodName ~ "(" ~ ctfeJoin(paramNames) ~ ");\n"; 222 code ~= "promise.succeed();\n"; 223 code ~= "} else {\n"; 224 code ~= "auto result = client_." ~ methodName ~ "(" ~ 225 ctfeJoin(paramNames) ~ ");\n"; 226 code ~= "promise.succeed(result);\n"; 227 code ~= "}\n"; 228 code ~= "} catch (Exception e) {\n"; 229 code ~= "promise.fail(e);\n"; 230 code ~= "}\n"; 231 code ~= "};\n"; 232 233 // If the request is cancelled, set the result promise to cancelled 234 // as well. This could be moved into an additional TAsyncWorkItem 235 // delegate parameter. 236 code ~= q{ 237 if (cancellation) { 238 cancellation.triggering.addCallback({ 239 promise.cancel(); 240 }); 241 } 242 }; 243 244 // Enqueue the work item and immediately return the promise (resp. its 245 // future interface). 246 code ~= "transport_.asyncManager.execute(transport_, work, cancellation);\n"; 247 code ~= "return promise;\n"; 248 code ~= "}\n"; 249 250 } 251 252 code ~= "}\n"; 253 return code; 254 }()); 255 }