1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */19 modulethrift.internal.resource_pool;
20 21 importcore.time : Duration, dur, TickDuration;
22 importstd.algorithm : minPos, reduce, remove;
23 importstd.array : array, empty;
24 importstd.exception : enforce;
25 importstd.conv : to;
26 importstd.random : randomCover, rndGen;
27 importstd.range : zip;
28 importthrift.internal.algorithm : removeEqual;
29 30 /**
31 * A pool of resources, which can be iterated over, and where resources that
32 * have failed too often can be temporarily disabled.
33 *
34 * This class is oblivious to the actual resource type managed.
35 */36 finalclassTResourcePool(Resource) {
37 /**
38 * Constructs a new instance.
39 *
40 * Params:
41 * resources = The initial members of the pool.
42 */43 this(Resource[] resources) {
44 resources_ = resources;
45 }
46 47 /**
48 * Adds a resource to the pool.
49 */50 voidadd(Resourceresource) {
51 resources_ ~= resource;
52 }
53 54 /**
55 * Removes a resource from the pool.
56 *
57 * Returns: Whether the resource could be found in the pool.
58 */59 boolremove(Resourceresource) {
60 autooldLength = resources_.length;
61 resources_ = removeEqual(resources_, resource);
62 returnresources_.length < oldLength;
63 }
64 65 /**
66 * Returns an »enriched« input range to iterate over the pool members.
67 */68 staticstructRange {
69 /**
70 * Whether the range is empty.
71 *
72 * This is the case if all members of the pool have been popped (or skipped
73 * because they were disabled) and TResourcePool.cycle is false, or there
74 * is no element to return in cycle mode because all have been temporarily
75 * disabled.
76 */77 boolempty() @property {
78 // If no resources are in the pool, the range will never become non-empty.79 if (resources_.empty) returntrue;
80 81 // If we already got the next resource in the cache, it doesn't matter82 // whether there are more.83 if (cached_) returnfalse;
84 85 size_texamineCount;
86 if (parent_.cycle) {
87 // We want to check all the resources, but not iterate more than once88 // to avoid spinning in a loop if nothing is available.89 examineCount = resources_.length;
90 } else {
91 // When not in cycle mode, we just iterate the list exactly once. If all92 // items have been consumed, the interval below is empty.93 examineCount = resources_.length - nextIndex_;
94 }
95 96 foreach (i; 0 .. examineCount) {
97 autor = resources_[(nextIndex_ + i) % resources_.length];
98 autofi = rinparent_.faultInfos_;
99 100 if (fi && fi.resetTime != fi.resetTime.init) {
101 if (fi.resetTime < parent_.getCurrentTick_()) {
102 // The timeout expired, remove the resource from the list and go103 // ahead trying it.104 parent_.faultInfos_.remove(r);
105 } else {
106 // The timeout didn't expire yet, try the next resource.107 continue;
108 }
109 }
110 111 cache_ = r;
112 cached_ = true;
113 nextIndex_ = nextIndex_ + i + 1;
114 returnfalse;
115 }
116 117 // If we get here, all resources are currently inactive or the non-cycle118 // pool has been exhausted, so there is nothing we can do.119 nextIndex_ = nextIndex_ + examineCount;
120 returntrue;
121 }
122 123 /**
124 * Returns the first resource in the range.
125 */126 Resourcefront() @property {
127 enforce(!empty);
128 returncache_;
129 }
130 131 /**
132 * Removes the first resource from the range.
133 *
134 * Usually, this is combined with a call to TResourcePool.recordSuccess()
135 * or recordFault().
136 */137 voidpopFront() {
138 enforce(!empty);
139 cached_ = false;
140 }
141 142 /**
143 * Returns whether the range will become non-empty at some point in the
144 * future, and provides additional information when this will happen and
145 * what will be the next resource.
146 *
147 * Makes only sense to call on empty ranges.
148 *
149 * Params:
150 * next = The next resource that will become available.
151 * waitTime = The duration until that resource will become available.
152 */153 boolwillBecomeNonempty(outResourcenext, outDurationwaitTime) {
154 // If no resources are in the pool, the range will never become non-empty.155 if (resources_.empty) returnfalse;
156 157 // If cycle mode is not enabled, a range never becomes non-empty after158 // being empty once, because all the elements have already been159 // used/skipped in order to become empty.160 if (!parent_.cycle) returnfalse;
161 162 autofi = parent_.faultInfos_;
163 autonextPair = minPos!"a[1].resetTime < b[1].resetTime"(
164 zip(fi.keys, fi.values)
165 ).front;
166 167 next = nextPair[0];
168 waitTime = to!Duration(nextPair[1].resetTime - parent_.getCurrentTick_());
169 170 returntrue;
171 }
172 173 private:
174 this(TResourcePoolparent, Resource[] resources) {
175 parent_ = parent;
176 resources_ = resources;
177 }
178 179 TResourcePoolparent_;
180 181 /// All available resources. We keep a copy of it as to not get confused182 /// when resources are added to/removed from the parent pool.183 Resource[] resources_;
184 185 /// After we have determined the next element in empty(), we store it here.186 Resourcecache_;
187 188 /// Whether there is currently something in the cache.189 boolcached_;
190 191 /// The index to start searching from at the next call to empty().192 size_tnextIndex_;
193 }
194 195 /// Ditto196 RangeopSlice() {
197 autores = resources_;
198 if (permute) {
199 res = array(randomCover(res, rndGen));
200 }
201 returnRange(this, res);
202 }
203 204 /**
205 * Records a success for an operation on the given resource, cancelling a
206 * fault streak, if any.
207 */208 voidrecordSuccess(Resourceresource) {
209 if (resourceinfaultInfos_) {
210 faultInfos_.remove(resource);
211 }
212 }
213 214 /**
215 * Records a fault for the given resource.
216 *
217 * If a resource fails consecutively for more than faultDisableCount times,
218 * it is temporarily disabled (no longer considered) until
219 * faultDisableDuration has passed.
220 */221 voidrecordFault(Resourceresource) {
222 autofi = resourceinfaultInfos_;
223 224 if (!fi) {
225 faultInfos_[resource] = FaultInfo();
226 fi = resourceinfaultInfos_;
227 }
228 229 ++fi.count;
230 if (fi.count >= faultDisableCount) {
231 // If the resource has hit the fault count limit, disable it for232 // specified duration.233 fi.resetTime = getCurrentTick_() + cast(TickDuration)faultDisableDuration;
234 }
235 }
236 237 /**
238 * Whether to randomly permute the order of the resources in the pool when
239 * taking a range using opSlice().
240 *
241 * This can be used e.g. as a simple form of load balancing.
242 */243 boolpermute = true;
244 245 /**
246 * Whether to keep iterating over the pool members after all have been
247 * returned/have failed once.
248 */249 boolcycle = false;
250 251 /**
252 * The number of consecutive faults after which a resource is disabled until
253 * faultDisableDuration has passed. Zero to never disable resources.
254 *
255 * Defaults to zero.
256 */257 ushortfaultDisableCount = 0;
258 259 /**
260 * The duration for which a resource is no longer considered after it has
261 * failed too often.
262 *
263 * Defaults to one second.
264 */265 DurationfaultDisableDuration = dur!"seconds"(1);
266 267 private:
268 Resource[] resources_;
269 FaultInfo[Resource] faultInfos_;
270 271 /// Function to get the current timestamp from some monotonic system clock.272 ///273 /// This is overridable to be able to write timing-insensitive unit tests.274 /// The extra indirection should not matter much performance-wise compared to275 /// the actual system call, and by its very nature thisshould not be on a hot276 /// path anyway.277 typeof(&TickDuration.currSystemTick) getCurrentTick_ =
278 &TickDuration.currSystemTick;
279 }
280 281 private {
282 structFaultInfo {
283 ushortcount;
284 TickDurationresetTime;
285 }
286 }
287 288 unittest {
289 autopool = newTResourcePool!Object([]);
290 enforce(pool[].empty);
291 ObjectdummyRes;
292 DurationdummyDur;
293 enforce(!pool[].willBecomeNonempty(dummyRes, dummyDur));
294 }
295 296 unittest {
297 importstd.datetime;
298 importthrift.base;
299 300 autoa = newObject;
301 autob = newObject;
302 autoc = newObject;
303 autoobjs = [a, b, c];
304 autopool = newTResourcePool!Object(objs);
305 pool.permute = false;
306 307 staticDurationfakeClock;
308 pool.getCurrentTick_ = () => cast(TickDuration)fakeClock;
309 310 ObjectdummyRes = void;
311 DurationdummyDur = void;
312 313 {
314 autor = pool[];
315 316 foreach (i, o; objs) {
317 enforce(!r.empty);
318 enforce(r.front == o);
319 r.popFront();
320 }
321 322 enforce(r.empty);
323 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
324 }
325 326 {
327 pool.faultDisableCount = 2;
328 329 enforce(pool[].front == a);
330 pool.recordFault(a);
331 enforce(pool[].front == a);
332 pool.recordSuccess(a);
333 enforce(pool[].front == a);
334 pool.recordFault(a);
335 enforce(pool[].front == a);
336 pool.recordFault(a);
337 338 autor = pool[];
339 enforce(r.front == b);
340 r.popFront();
341 enforce(r.front == c);
342 r.popFront();
343 enforce(r.empty);
344 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
345 346 fakeClock += 2.seconds;
347 // Not in cycle mode, has to be still empty after the timeouts expired.348 enforce(r.empty);
349 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
350 351 foreach (o; objs) pool.recordSuccess(o);
352 }
353 354 {
355 pool.faultDisableCount = 1;
356 357 pool.recordFault(a);
358 pool.recordFault(b);
359 pool.recordFault(c);
360 361 autor = pool[];
362 enforce(r.empty);
363 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
364 365 foreach (o; objs) pool.recordSuccess(o);
366 }
367 368 pool.cycle = true;
369 370 {
371 autor = pool[];
372 373 foreach (o; objs ~ objs) {
374 enforce(!r.empty);
375 enforce(r.front == o);
376 r.popFront();
377 }
378 }
379 380 {
381 pool.faultDisableCount = 2;
382 383 enforce(pool[].front == a);
384 pool.recordFault(a);
385 enforce(pool[].front == a);
386 pool.recordSuccess(a);
387 enforce(pool[].front == a);
388 pool.recordFault(a);
389 enforce(pool[].front == a);
390 pool.recordFault(a);
391 392 autor = pool[];
393 enforce(r.front == b);
394 r.popFront();
395 enforce(r.front == c);
396 r.popFront();
397 enforce(r.front == b);
398 399 fakeClock += 2.seconds;
400 401 r.popFront();
402 enforce(r.front == c);
403 404 r.popFront();
405 enforce(r.front == a);
406 407 enforce(pool[].front == a);
408 409 foreach (o; objs) pool.recordSuccess(o);
410 }
411 412 {
413 pool.faultDisableCount = 1;
414 415 pool.recordFault(a);
416 fakeClock += 1.msecs;
417 pool.recordFault(b);
418 fakeClock += 1.msecs;
419 pool.recordFault(c);
420 421 autor = pool[];
422 enforce(r.empty);
423 424 // Make sure willBecomeNonempty gets the order right.425 enforce(r.willBecomeNonempty(dummyRes, dummyDur));
426 enforce(dummyRes == a);
427 enforce(dummyDur > Duration.zero);
428 429 foreach (o; objs) pool.recordSuccess(o);
430 }
431 }