1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.internal.resource_pool; 20 21 import core.time : Duration, dur, TickDuration; 22 import std.algorithm : minPos, reduce, remove; 23 import std.array : array, empty; 24 import std.exception : enforce; 25 import std.conv : to; 26 import std.random : randomCover, rndGen; 27 import std.range : zip; 28 import thrift.internal.algorithm : removeEqual; 29 30 /** 31 * A pool of resources, which can be iterated over, and where resources that 32 * have failed too often can be temporarily disabled. 33 * 34 * This class is oblivious to the actual resource type managed. 35 */ 36 final class TResourcePool(Resource) { 37 /** 38 * Constructs a new instance. 39 * 40 * Params: 41 * resources = The initial members of the pool. 42 */ 43 this(Resource[] resources) { 44 resources_ = resources; 45 } 46 47 /** 48 * Adds a resource to the pool. 49 */ 50 void add(Resource resource) { 51 resources_ ~= resource; 52 } 53 54 /** 55 * Removes a resource from the pool. 56 * 57 * Returns: Whether the resource could be found in the pool. 58 */ 59 bool remove(Resource resource) { 60 auto oldLength = resources_.length; 61 resources_ = removeEqual(resources_, resource); 62 return resources_.length < oldLength; 63 } 64 65 /** 66 * Returns an »enriched« input range to iterate over the pool members. 67 */ 68 static struct Range { 69 /** 70 * Whether the range is empty. 71 * 72 * This is the case if all members of the pool have been popped (or skipped 73 * because they were disabled) and TResourcePool.cycle is false, or there 74 * is no element to return in cycle mode because all have been temporarily 75 * disabled. 76 */ 77 bool empty() @property { 78 // If no resources are in the pool, the range will never become non-empty. 79 if (resources_.empty) return true; 80 81 // If we already got the next resource in the cache, it doesn't matter 82 // whether there are more. 83 if (cached_) return false; 84 85 size_t examineCount; 86 if (parent_.cycle) { 87 // We want to check all the resources, but not iterate more than once 88 // to avoid spinning in a loop if nothing is available. 89 examineCount = resources_.length; 90 } else { 91 // When not in cycle mode, we just iterate the list exactly once. If all 92 // items have been consumed, the interval below is empty. 93 examineCount = resources_.length - nextIndex_; 94 } 95 96 foreach (i; 0 .. examineCount) { 97 auto r = resources_[(nextIndex_ + i) % resources_.length]; 98 auto fi = r in parent_.faultInfos_; 99 100 if (fi && fi.resetTime != fi.resetTime.init) { 101 if (fi.resetTime < parent_.getCurrentTick_()) { 102 // The timeout expired, remove the resource from the list and go 103 // ahead trying it. 104 parent_.faultInfos_.remove(r); 105 } else { 106 // The timeout didn't expire yet, try the next resource. 107 continue; 108 } 109 } 110 111 cache_ = r; 112 cached_ = true; 113 nextIndex_ = nextIndex_ + i + 1; 114 return false; 115 } 116 117 // If we get here, all resources are currently inactive or the non-cycle 118 // pool has been exhausted, so there is nothing we can do. 119 nextIndex_ = nextIndex_ + examineCount; 120 return true; 121 } 122 123 /** 124 * Returns the first resource in the range. 125 */ 126 Resource front() @property { 127 enforce(!empty); 128 return cache_; 129 } 130 131 /** 132 * Removes the first resource from the range. 133 * 134 * Usually, this is combined with a call to TResourcePool.recordSuccess() 135 * or recordFault(). 136 */ 137 void popFront() { 138 enforce(!empty); 139 cached_ = false; 140 } 141 142 /** 143 * Returns whether the range will become non-empty at some point in the 144 * future, and provides additional information when this will happen and 145 * what will be the next resource. 146 * 147 * Makes only sense to call on empty ranges. 148 * 149 * Params: 150 * next = The next resource that will become available. 151 * waitTime = The duration until that resource will become available. 152 */ 153 bool willBecomeNonempty(out Resource next, out Duration waitTime) { 154 // If no resources are in the pool, the range will never become non-empty. 155 if (resources_.empty) return false; 156 157 // If cycle mode is not enabled, a range never becomes non-empty after 158 // being empty once, because all the elements have already been 159 // used/skipped in order to become empty. 160 if (!parent_.cycle) return false; 161 162 auto fi = parent_.faultInfos_; 163 auto nextPair = minPos!"a[1].resetTime < b[1].resetTime"( 164 zip(fi.keys, fi.values) 165 ).front; 166 167 next = nextPair[0]; 168 waitTime = to!Duration(nextPair[1].resetTime - parent_.getCurrentTick_()); 169 170 return true; 171 } 172 173 private: 174 this(TResourcePool parent, Resource[] resources) { 175 parent_ = parent; 176 resources_ = resources; 177 } 178 179 TResourcePool parent_; 180 181 /// All available resources. We keep a copy of it as to not get confused 182 /// when resources are added to/removed from the parent pool. 183 Resource[] resources_; 184 185 /// After we have determined the next element in empty(), we store it here. 186 Resource cache_; 187 188 /// Whether there is currently something in the cache. 189 bool cached_; 190 191 /// The index to start searching from at the next call to empty(). 192 size_t nextIndex_; 193 } 194 195 /// Ditto 196 Range opSlice() { 197 auto res = resources_; 198 if (permute) { 199 res = array(randomCover(res, rndGen)); 200 } 201 return Range(this, res); 202 } 203 204 /** 205 * Records a success for an operation on the given resource, cancelling a 206 * fault streak, if any. 207 */ 208 void recordSuccess(Resource resource) { 209 if (resource in faultInfos_) { 210 faultInfos_.remove(resource); 211 } 212 } 213 214 /** 215 * Records a fault for the given resource. 216 * 217 * If a resource fails consecutively for more than faultDisableCount times, 218 * it is temporarily disabled (no longer considered) until 219 * faultDisableDuration has passed. 220 */ 221 void recordFault(Resource resource) { 222 auto fi = resource in faultInfos_; 223 224 if (!fi) { 225 faultInfos_[resource] = FaultInfo(); 226 fi = resource in faultInfos_; 227 } 228 229 ++fi.count; 230 if (fi.count >= faultDisableCount) { 231 // If the resource has hit the fault count limit, disable it for 232 // specified duration. 233 fi.resetTime = getCurrentTick_() + cast(TickDuration)faultDisableDuration; 234 } 235 } 236 237 /** 238 * Whether to randomly permute the order of the resources in the pool when 239 * taking a range using opSlice(). 240 * 241 * This can be used e.g. as a simple form of load balancing. 242 */ 243 bool permute = true; 244 245 /** 246 * Whether to keep iterating over the pool members after all have been 247 * returned/have failed once. 248 */ 249 bool cycle = false; 250 251 /** 252 * The number of consecutive faults after which a resource is disabled until 253 * faultDisableDuration has passed. Zero to never disable resources. 254 * 255 * Defaults to zero. 256 */ 257 ushort faultDisableCount = 0; 258 259 /** 260 * The duration for which a resource is no longer considered after it has 261 * failed too often. 262 * 263 * Defaults to one second. 264 */ 265 Duration faultDisableDuration = dur!"seconds"(1); 266 267 private: 268 Resource[] resources_; 269 FaultInfo[Resource] faultInfos_; 270 271 /// Function to get the current timestamp from some monotonic system clock. 272 /// 273 /// This is overridable to be able to write timing-insensitive unit tests. 274 /// The extra indirection should not matter much performance-wise compared to 275 /// the actual system call, and by its very nature thisshould not be on a hot 276 /// path anyway. 277 typeof(&TickDuration.currSystemTick) getCurrentTick_ = 278 &TickDuration.currSystemTick; 279 } 280 281 private { 282 struct FaultInfo { 283 ushort count; 284 TickDuration resetTime; 285 } 286 } 287 288 unittest { 289 auto pool = new TResourcePool!Object([]); 290 enforce(pool[].empty); 291 Object dummyRes; 292 Duration dummyDur; 293 enforce(!pool[].willBecomeNonempty(dummyRes, dummyDur)); 294 } 295 296 unittest { 297 import std.datetime; 298 import thrift.base; 299 300 auto a = new Object; 301 auto b = new Object; 302 auto c = new Object; 303 auto objs = [a, b, c]; 304 auto pool = new TResourcePool!Object(objs); 305 pool.permute = false; 306 307 static Duration fakeClock; 308 pool.getCurrentTick_ = () => cast(TickDuration)fakeClock; 309 310 Object dummyRes = void; 311 Duration dummyDur = void; 312 313 { 314 auto r = pool[]; 315 316 foreach (i, o; objs) { 317 enforce(!r.empty); 318 enforce(r.front == o); 319 r.popFront(); 320 } 321 322 enforce(r.empty); 323 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 324 } 325 326 { 327 pool.faultDisableCount = 2; 328 329 enforce(pool[].front == a); 330 pool.recordFault(a); 331 enforce(pool[].front == a); 332 pool.recordSuccess(a); 333 enforce(pool[].front == a); 334 pool.recordFault(a); 335 enforce(pool[].front == a); 336 pool.recordFault(a); 337 338 auto r = pool[]; 339 enforce(r.front == b); 340 r.popFront(); 341 enforce(r.front == c); 342 r.popFront(); 343 enforce(r.empty); 344 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 345 346 fakeClock += 2.seconds; 347 // Not in cycle mode, has to be still empty after the timeouts expired. 348 enforce(r.empty); 349 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 350 351 foreach (o; objs) pool.recordSuccess(o); 352 } 353 354 { 355 pool.faultDisableCount = 1; 356 357 pool.recordFault(a); 358 pool.recordFault(b); 359 pool.recordFault(c); 360 361 auto r = pool[]; 362 enforce(r.empty); 363 enforce(!r.willBecomeNonempty(dummyRes, dummyDur)); 364 365 foreach (o; objs) pool.recordSuccess(o); 366 } 367 368 pool.cycle = true; 369 370 { 371 auto r = pool[]; 372 373 foreach (o; objs ~ objs) { 374 enforce(!r.empty); 375 enforce(r.front == o); 376 r.popFront(); 377 } 378 } 379 380 { 381 pool.faultDisableCount = 2; 382 383 enforce(pool[].front == a); 384 pool.recordFault(a); 385 enforce(pool[].front == a); 386 pool.recordSuccess(a); 387 enforce(pool[].front == a); 388 pool.recordFault(a); 389 enforce(pool[].front == a); 390 pool.recordFault(a); 391 392 auto r = pool[]; 393 enforce(r.front == b); 394 r.popFront(); 395 enforce(r.front == c); 396 r.popFront(); 397 enforce(r.front == b); 398 399 fakeClock += 2.seconds; 400 401 r.popFront(); 402 enforce(r.front == c); 403 404 r.popFront(); 405 enforce(r.front == a); 406 407 enforce(pool[].front == a); 408 409 foreach (o; objs) pool.recordSuccess(o); 410 } 411 412 { 413 pool.faultDisableCount = 1; 414 415 pool.recordFault(a); 416 fakeClock += 1.msecs; 417 pool.recordFault(b); 418 fakeClock += 1.msecs; 419 pool.recordFault(c); 420 421 auto r = pool[]; 422 enforce(r.empty); 423 424 // Make sure willBecomeNonempty gets the order right. 425 enforce(r.willBecomeNonempty(dummyRes, dummyDur)); 426 enforce(dummyRes == a); 427 enforce(dummyDur > Duration.zero); 428 429 foreach (o; objs) pool.recordSuccess(o); 430 } 431 }