1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.codegen.client_pool; 20 21 import core.time : dur, Duration, TickDuration; 22 import std.traits : ParameterTypeTuple, ReturnType; 23 import thrift.base; 24 import thrift.codegen.base; 25 import thrift.codegen.client; 26 import thrift.internal.codegen; 27 import thrift.internal.resource_pool; 28 29 /** 30 * Manages a pool of TClients for the given interface, forwarding RPC calls to 31 * members of the pool. 32 * 33 * If a request fails, another client from the pool is tried, and optionally, 34 * a client is disabled for a configurable amount of time if it fails too 35 * often. If all clients fail (and keepTrying is false), a 36 * TCompoundOperationException is thrown, containing all the collected RPC 37 * exceptions. 38 */ 39 class TClientPool(Interface) if (isService!Interface) : Interface { 40 /// Shorthand for TClientBase!Interface, the client type this instance 41 /// operates on. 42 alias TClientBase!Interface Client; 43 44 /** 45 * Creates a new instance and adds the given clients to the pool. 46 */ 47 this(Client[] clients) { 48 pool_ = new TResourcePool!Client(clients); 49 50 rpcFaultFilter = (Exception e) { 51 import thrift.protocol.base; 52 import thrift.transport.base; 53 return ( 54 (cast(TTransportException)e !is null) || 55 (cast(TApplicationException)e !is null) 56 ); 57 }; 58 } 59 60 /** 61 * Executes an operation on the first currently active client. 62 * 63 * If the operation fails (throws an exception for which rpcFaultFilter is 64 * true), the failure is recorded and the next client in the pool is tried. 65 * 66 * Throws: Any non-rpc exception that occurs, a TCompoundOperationException 67 * if all clients failed with an rpc exception (if keepTrying is false). 68 * 69 * Example: 70 * --- 71 * interface Foo { string bar(); } 72 * auto poolClient = tClientPool([tClient!Foo(someProtocol)]); 73 * auto result = poolClient.execute((c){ return c.bar(); }); 74 * --- 75 */ 76 ResultType execute(ResultType)(scope ResultType delegate(Client) work) { 77 return executeOnPool!Client(work); 78 } 79 80 /** 81 * Adds a client to the pool. 82 */ 83 void addClient(Client client) { 84 pool_.add(client); 85 } 86 87 /** 88 * Removes a client from the pool. 89 * 90 * Returns: Whether the client was found in the pool. 91 */ 92 bool removeClient(Client client) { 93 return pool_.remove(client); 94 } 95 96 mixin(poolForwardCode!Interface()); 97 98 /// Whether to open the underlying transports of a client before trying to 99 /// execute a method if they are not open. This is usually desirable 100 /// because it allows e.g. to automatically reconnect to a remote server 101 /// if the network connection is dropped. 102 /// 103 /// Defaults to true. 104 bool reopenTransports = true; 105 106 /// Called to determine whether an exception comes from a client from the 107 /// pool not working properly, or if it an exception thrown at the 108 /// application level. 109 /// 110 /// If the delegate returns true, the server/connection is considered to be 111 /// at fault, if it returns false, the exception is just passed on to the 112 /// caller. 113 /// 114 /// By default, returns true for instances of TTransportException and 115 /// TApplicationException, false otherwise. 116 bool delegate(Exception) rpcFaultFilter; 117 118 /** 119 * Whether to keep trying to find a working client if all have failed in a 120 * row. 121 * 122 * Defaults to false. 123 */ 124 bool keepTrying() const @property { 125 return pool_.cycle; 126 } 127 128 /// Ditto 129 void keepTrying(bool value) @property { 130 pool_.cycle = value; 131 } 132 133 /** 134 * Whether to use a random permutation of the client pool on every call to 135 * execute(). This can be used e.g. as a simple form of load balancing. 136 * 137 * Defaults to true. 138 */ 139 bool permuteClients() const @property { 140 return pool_.permute; 141 } 142 143 /// Ditto 144 void permuteClients(bool value) @property { 145 pool_.permute = value; 146 } 147 148 /** 149 * The number of consecutive faults after which a client is disabled until 150 * faultDisableDuration has passed. 0 to never disable clients. 151 * 152 * Defaults to 0. 153 */ 154 ushort faultDisableCount() @property { 155 return pool_.faultDisableCount; 156 } 157 158 /// Ditto 159 void faultDisableCount(ushort value) @property { 160 pool_.faultDisableCount = value; 161 } 162 163 /** 164 * The duration for which a client is no longer considered after it has 165 * failed too often. 166 * 167 * Defaults to one second. 168 */ 169 Duration faultDisableDuration() @property { 170 return pool_.faultDisableDuration; 171 } 172 173 /// Ditto 174 void faultDisableDuration(Duration value) @property { 175 pool_.faultDisableDuration = value; 176 } 177 178 protected: 179 ResultType executeOnPool(ResultType)(scope ResultType delegate(Client) work) { 180 auto clients = pool_[]; 181 if (clients.empty) { 182 throw new TException("No clients available to try."); 183 } 184 185 while (true) { 186 Exception[] rpcExceptions; 187 while (!clients.empty) { 188 auto c = clients.front; 189 clients.popFront; 190 try { 191 scope (success) { 192 pool_.recordSuccess(c); 193 } 194 195 if (reopenTransports) { 196 c.inputProtocol.transport.open(); 197 c.outputProtocol.transport.open(); 198 } 199 200 return work(c); 201 } catch (Exception e) { 202 if (rpcFaultFilter && rpcFaultFilter(e)) { 203 pool_.recordFault(c); 204 rpcExceptions ~= e; 205 } else { 206 // We are dealing with a normal exception thrown by the 207 // server-side method, just pass it on. As far as we are 208 // concerned, the method call succeeded. 209 pool_.recordSuccess(c); 210 throw e; 211 } 212 } 213 } 214 215 // If we get here, no client succeeded during the current iteration. 216 Duration waitTime; 217 Client dummy; 218 if (clients.willBecomeNonempty(dummy, waitTime)) { 219 if (waitTime > dur!"hnsecs"(0)) { 220 import core.thread; 221 Thread.sleep(waitTime); 222 } 223 } else { 224 throw new TCompoundOperationException("All clients failed.", 225 rpcExceptions); 226 } 227 } 228 } 229 230 private: 231 TResourcePool!Client pool_; 232 } 233 234 private { 235 // Cannot use an anonymous delegate literal for this because they aren't 236 // allowed in class scope. 237 string poolForwardCode(Interface)() { 238 string code = ""; 239 240 foreach (methodName; AllMemberMethodNames!Interface) { 241 enum qn = "Interface." ~ methodName; 242 code ~= "ReturnType!(" ~ qn ~ ") " ~ methodName ~ 243 "(ParameterTypeTuple!(" ~ qn ~ ") args) {\n"; 244 code ~= "return executeOnPool((Client c){ return c." ~ 245 methodName ~ "(args); });\n"; 246 code ~= "}\n"; 247 } 248 249 return code; 250 } 251 } 252 253 /** 254 * TClientPool construction helper to avoid having to explicitly specify 255 * the interface type, i.e. to allow the constructor being called using IFTI 256 * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)). 257 */ 258 TClientPool!Interface tClientPool(Interface)( 259 TClientBase!Interface[] clients 260 ) if (isService!Interface) { 261 return new typeof(return)(clients); 262 }