1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.codegen.async_client;
20 
21 import std.conv : text, to;
22 import std.traits : ParameterStorageClass, ParameterStorageClassTuple,
23   ParameterTypeTuple, ReturnType;
24 import thrift.base;
25 import thrift.async.base;
26 import thrift.codegen.base;
27 import thrift.codegen.client;
28 import thrift.internal.codegen;
29 import thrift.internal.ctfe;
30 import thrift.protocol.base;
31 import thrift.transport.base;
32 import thrift.util.cancellation;
33 import thrift.util.future;
34 
35 /**
36  * Asynchronous Thrift service client which returns the results as TFutures an
37  * uses a TAsyncManager to perform the actual work.
38  *
39  * TAsyncClientBase serves as a supertype for all TAsyncClients for the same
40  * service, which might be instantiated with different concrete protocol types
41  * (there is no covariance for template type parameters), and extends
42  * TFutureInterface!Interface. If Interface is derived from another service
43  * BaseInterface, it also extends TAsyncClientBase!BaseInterface.
44  *
45  * TAsyncClient implements TAsyncClientBase and offers two constructors with
46  * the following signatures:
47  * ---
48  * this(TAsyncTransport trans, TTransportFactory tf, TProtocolFactory pf);
49  * this(TAsyncTransport trans, TTransportFactory itf, TTransportFactory otf,
50  *   TProtocolFactory ipf, TProtocolFactory opf);
51  * ---
52  *
53  * Again, if Interface represents a derived Thrift service,
54  * TAsyncClient!Interface is also derived from TAsyncClient!BaseInterface.
55  *
56  * TAsyncClient can exclusively be used with TAsyncTransports, as it needs to
57  * access the associated TAsyncManager. To set up any wrapper transports
58  * (e.g. buffered, framed) on top of it and to instanciate the protocols to use,
59  * TTransportFactory and TProtocolFactory instances are passed to the
60  * constructors – the three argument constructor is a shortcut if the same
61  * transport and protocol are to be used for both input and output, which is
62  * the most common case.
63  *
64  * If the same transport factory is passed for both input and output transports,
65  * only a single wrapper transport will be created and used for both directions.
66  * This allows easy implementation of protocols like SSL.
67  *
68  * Just as TClient does, TAsyncClient also takes two optional template
69  * arguments which can be used for specifying the actual TProtocol
70  * implementation used for optimization purposes, as virtual calls can
71  * completely be eliminated then. If the actual types of the protocols
72  * instantiated by the factories used does not match the ones statically
73  * specified in the template parameters, a TException is thrown during
74  * construction.
75  *
76  * Example:
77  * ---
78  * // A simple Thrift service.
79  * interface Foo { int foo(); }
80  *
81  * // Create a TAsyncSocketManager – thrift.async.libevent is used for this
82  * // example.
83  * auto manager = new TLibeventAsyncManager;
84  *
85  * // Set up an async transport to use.
86  * auto socket = new TAsyncSocket(manager, host, port);
87  *
88  * // Create a client instance.
89  * auto client = new TAsyncClient!Foo(
90  *   socket,
91  *   new TBufferedTransportFactory, // Wrap the socket in a TBufferedTransport.
92  *   new TBinaryProtocolFactory!() // Use the Binary protocol.
93  * );
94  *
95  * // Call foo and use the returned future.
96  * auto result = client.foo();
97  * pragma(msg, typeof(result)); // TFuture!int
98  * int resultValue = result.waitGet(); // Waits until the result is available.
99  * ---
100  */
101 interface TAsyncClientBase(Interface) if (isBaseService!Interface) :
102   TFutureInterface!Interface
103 {
104   /**
105    * The underlying TAsyncTransport used by this client instance.
106    */
107   TAsyncTransport transport() @property;
108 }
109 
110 /// Ditto
111 interface TAsyncClientBase(Interface) if (isDerivedService!Interface) :
112   TAsyncClientBase!(BaseService!Interface), TFutureInterface!Interface
113 {}
114 
115 /// Ditto
116 template TAsyncClient(Interface, InputProtocol = TProtocol, OutputProtocol = void) if (
117   isService!Interface && isTProtocol!InputProtocol &&
118   (isTProtocol!OutputProtocol || is(OutputProtocol == void))
119 ) {
120   mixin({
121     static if (isDerivedService!Interface) {
122       string code = "class TAsyncClient : " ~
123         "TAsyncClient!(BaseService!Interface, InputProtocol, OutputProtocol), " ~
124         "TAsyncClientBase!Interface {\n";
125       code ~= q{
126         this(TAsyncTransport trans, TTransportFactory tf, TProtocolFactory pf) {
127           this(trans, tf, tf, pf, pf);
128         }
129 
130         this(TAsyncTransport trans, TTransportFactory itf,
131           TTransportFactory otf, TProtocolFactory ipf, TProtocolFactory opf
132         ) {
133           super(trans, itf, otf, ipf, opf);
134           client_ = new typeof(client_)(iprot_, oprot_);
135         }
136 
137         private TClient!(Interface, IProt, OProt) client_;
138       };
139     } else {
140       string code = "class TAsyncClient : TAsyncClientBase!Interface {";
141       code ~= q{
142         alias InputProtocol IProt;
143         static if (isTProtocol!OutputProtocol) {
144           alias OutputProtocol OProt;
145         } else {
146           static assert(is(OutputProtocol == void));
147           alias InputProtocol OProt;
148         }
149 
150         this(TAsyncTransport trans, TTransportFactory tf, TProtocolFactory pf) {
151           this(trans, tf, tf, pf, pf);
152         }
153 
154         this(TAsyncTransport trans, TTransportFactory itf,
155           TTransportFactory otf, TProtocolFactory ipf, TProtocolFactory opf
156         ) {
157           import std.exception;
158           transport_ = trans;
159 
160           auto ip = itf.getTransport(trans);
161           TTransport op = void;
162           if (itf == otf) {
163             op = ip;
164           } else {
165             op = otf.getTransport(trans);
166           }
167 
168           auto iprot = ipf.getProtocol(ip);
169           iprot_ = cast(IProt)iprot;
170           enforce(iprot_, new TException(text("Input protocol not of the " ~
171             "specified concrete type (", IProt.stringof, ").")));
172 
173           auto oprot = opf.getProtocol(op);
174           oprot_ = cast(OProt)oprot;
175           enforce(oprot_, new TException(text("Output protocol not of the " ~
176             "specified concrete type (", OProt.stringof, ").")));
177 
178           client_ = new typeof(client_)(iprot_, oprot_);
179         }
180 
181         override TAsyncTransport transport() @property {
182           return transport_;
183         }
184 
185         protected TAsyncTransport transport_;
186         protected IProt iprot_;
187         protected OProt oprot_;
188         private TClient!(Interface, IProt, OProt) client_;
189       };
190     }
191 
192     foreach (methodName;
193       FilterMethodNames!(Interface, __traits(derivedMembers, Interface))
194     ) {
195       string[] paramList;
196       string[] paramNames;
197       foreach (i, _; ParameterTypeTuple!(mixin("Interface." ~ methodName))) {
198         immutable paramName = "param" ~ to!string(i + 1);
199         immutable storage = ParameterStorageClassTuple!(
200           mixin("Interface." ~ methodName))[i];
201 
202         paramList ~= ((storage & ParameterStorageClass.ref_) ? "ref " : "") ~
203           "ParameterTypeTuple!(Interface." ~ methodName ~ ")[" ~
204           to!string(i) ~ "] " ~ paramName;
205         paramNames ~= paramName;
206       }
207       paramList ~= "TCancellation cancellation = null";
208 
209       immutable returnTypeCode = "ReturnType!(Interface." ~ methodName ~ ")";
210       code ~= "TFuture!(" ~ returnTypeCode ~ ") " ~ methodName ~ "(" ~
211         ctfeJoin(paramList) ~ ") {\n";
212 
213       // Create the future instance that will repesent the result.
214       code ~= "auto promise = new TPromise!(" ~ returnTypeCode ~ ");\n";
215 
216       // Prepare delegate which executes the TClient method call.
217       code ~= "auto work = {\n";
218       code ~= "try {\n";
219       code ~= "static if (is(ReturnType!(Interface." ~ methodName ~
220         ") == void)) {\n";
221       code ~= "client_." ~ methodName ~ "(" ~ ctfeJoin(paramNames) ~ ");\n";
222       code ~= "promise.succeed();\n";
223       code ~= "} else {\n";
224       code ~= "auto result = client_." ~ methodName ~ "(" ~
225         ctfeJoin(paramNames) ~ ");\n";
226       code ~= "promise.succeed(result);\n";
227       code ~= "}\n";
228       code ~= "} catch (Exception e) {\n";
229       code ~= "promise.fail(e);\n";
230       code ~= "}\n";
231       code ~= "};\n";
232 
233       // If the request is cancelled, set the result promise to cancelled
234       // as well. This could be moved into an additional TAsyncWorkItem
235       // delegate parameter.
236       code ~= q{
237         if (cancellation) {
238           cancellation.triggering.addCallback({
239             promise.cancel();
240           });
241         }
242       };
243 
244       // Enqueue the work item and immediately return the promise (resp. its
245       // future interface).
246       code ~= "transport_.asyncManager.execute(transport_, work, cancellation);\n";
247       code ~= "return promise;\n";
248       code ~= "}\n";
249 
250     }
251 
252     code ~= "}\n";
253     return code;
254   }());
255 }