1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 /**
21  * Transports for reading from/writing to Thrift »log files«.
22  *
23  * These transports are not »stupid« sources and sinks just reading and
24  * writing bytes from a file verbatim, but organize the contents in the form
25  * of so-called »events«, which refers to the data written between two flush()
26  * calls.
27  *
28  * Chunking is supported, events are guaranteed to never span chunk boundaries.
29  * As a consequence, an event can never be larger than the chunk size. The
30  * chunk size used is not saved with the file, so care has to be taken to make
31  * sure the same chunk size is used for reading and writing.
32  */
33 module thrift.transport.file;
34 
35 import core.thread : Thread;
36 import std.array : empty;
37 import std.algorithm : min, max;
38 import std.concurrency;
39 import std.conv : to;
40 import std.datetime : dur, Duration;
41 import std.datetime.stopwatch : AutoStart, StopWatch;
42 import std.exception;
43 import std.stdio : File;
44 import thrift.base;
45 import thrift.transport.base;
46 
47 /// The default chunk size, in bytes.
48 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
49 
50 /// The type used to represent event sizes in the file.
51 alias uint EventSize;
52 
53 version (BigEndian) {
54   static assert(false,
55     "Little endian byte order is assumed in thrift.transport.file.");
56 }
57 
58 /**
59  * A transport used to read log files. It can never be written to, calling
60  * write() throws.
61  *
62  * Contrary to the C++ design, explicitly opening the transport/file before
63  * using is necessary to allow manually closing the file without relying on the
64  * object lifetime. Otherwise, it's a straight port of the C++ implementation.
65  */
66 final class TFileReaderTransport : TBaseTransport {
67   /**
68    * Creates a new file writer transport.
69    *
70    * Params:
71    *   path = Path of the file to opperate on.
72    */
73   this(string path) {
74     path_ = path;
75     chunkSize_ = DEFAULT_CHUNK_SIZE;
76     readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
77     readTimeout_ = DEFAULT_READ_TIMEOUT;
78     corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
79     maxEventSize = DEFAULT_MAX_EVENT_SIZE;
80   }
81 
82   override bool isOpen() @property {
83     return isOpen_;
84   }
85 
86   override bool peek() {
87     if (!isOpen) return false;
88 
89     // If there is no event currently processed, try fetching one from the
90     // file.
91     if (!currentEvent_) {
92       currentEvent_ = readEvent();
93 
94       if (!currentEvent_) {
95         // Still nothing there, couldn't read a new event.
96         return false;
97       }
98     }
99     // check if there is anything to read
100     return (currentEvent_.length - currentEventPos_) > 0;
101   }
102 
103   override void open() {
104     if (isOpen) return;
105     try {
106       file_ = File(path_, "rb");
107     } catch (Exception e) {
108       throw new TTransportException("Error on opening input file.",
109         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
110     }
111     isOpen_ = true;
112   }
113 
114   override void close() {
115     if (!isOpen) return;
116 
117     file_.close();
118     isOpen_ = false;
119     readState_.resetAllValues();
120   }
121 
122   override size_t read(ubyte[] buf) {
123     enforce(isOpen, new TTransportException(
124       "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));
125 
126     // If there is no event currently processed, try fetching one from the
127     // file.
128     if (!currentEvent_) {
129       currentEvent_ = readEvent();
130 
131       if (!currentEvent_) {
132         // Still nothing there, couldn't read a new event.
133         return 0;
134       }
135     }
136 
137     auto len = buf.length;
138     auto remaining = currentEvent_.length - currentEventPos_;
139 
140     if (remaining <= len) {
141       // If less than the requested length is available, read as much as
142       // possible.
143       buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
144       currentEvent_ = null;
145       currentEventPos_ = 0;
146       return remaining;
147     }
148 
149     // There will still be data left in the buffer after reading, pass out len
150     // bytes.
151     buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
152     currentEventPos_ += len;
153     return len;
154   }
155 
156   ulong getNumChunks() {
157     enforce(isOpen, new TTransportException(
158       "Cannot get number of chunks if file not open.",
159       TTransportException.Type.NOT_OPEN));
160 
161     try {
162       auto fileSize = file_.size();
163       if (fileSize == 0) {
164         // Empty files have no chunks.
165         return 0;
166       }
167       return ((fileSize)/chunkSize_) + 1;
168     } catch (Exception e) {
169       throw new TTransportException("Error getting file size.", __FILE__,
170         __LINE__, e);
171     }
172   }
173 
174   ulong getCurChunk() {
175     return offset_ / chunkSize_;
176   }
177 
178   void seekToChunk(long chunk) {
179     enforce(isOpen, new TTransportException(
180       "Cannot get number of chunks if file not open.",
181       TTransportException.Type.NOT_OPEN));
182 
183     auto numChunks = getNumChunks();
184 
185     if (chunk < 0) {
186       // Count negative indices from the end.
187       chunk += numChunks;
188     }
189 
190     if (chunk < 0) {
191       logError("Incorrect chunk number for reverse seek, seeking to " ~
192        "beginning instead: %s", chunk);
193       chunk = 0;
194     }
195 
196     bool seekToEnd;
197     long minEndOffset;
198     if (chunk >= numChunks) {
199       logError("Trying to seek to non-existing chunk, seeking to " ~
200        "end of file instead: %s", chunk);
201       seekToEnd = true;
202       chunk = numChunks - 1;
203       // this is the min offset to process events till
204       minEndOffset = file_.size();
205     }
206 
207     readState_.resetAllValues();
208     currentEvent_ = null;
209 
210     try {
211       file_.seek(chunk * chunkSize_);
212       offset_ = chunk * chunkSize_;
213     } catch (Exception e) {
214       throw new TTransportException("Error seeking to chunk", __FILE__,
215         __LINE__, e);
216     }
217 
218     if (seekToEnd) {
219       // Never wait on the end of the file for new content, we just want to
220       // find the last one.
221       auto oldReadTimeout = readTimeout_;
222       scope (exit) readTimeout_ = oldReadTimeout;
223       readTimeout_ = dur!"hnsecs"(0);
224 
225       // Keep on reading unti the last event at point of seekToChunk call.
226       while ((offset_ + readState_.bufferPos_) < minEndOffset) {
227         if (readEvent() is null) {
228           break;
229         }
230       }
231     }
232   }
233 
234   void seekToEnd() {
235     seekToChunk(getNumChunks());
236   }
237 
238   /**
239    * The size of the chunks the file is divided into, in bytes.
240    */
241   ulong chunkSize() @property const {
242     return chunkSize_;
243   }
244 
245   /// ditto
246   void chunkSize(ulong value) @property {
247     enforce(!isOpen, new TTransportException(
248       "Cannot set chunk size after TFileReaderTransport has been opened."));
249     enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
250       "be large enough to accommodate at least a single byte of payload data."));
251     chunkSize_ = value;
252   }
253 
254   /**
255    * If positive, wait the specified duration for new data when arriving at
256    * end of file. If negative, wait forever (tailing mode), waking up to check
257    * in the specified interval. If zero, do not wait at all.
258    *
259    * Defaults to 500 ms.
260    */
261   Duration readTimeout() @property const {
262     return readTimeout_;
263   }
264 
265   /// ditto
266   void readTimeout(Duration value) @property {
267     readTimeout_ = value;
268   }
269 
270   /// ditto
271   enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);
272 
273   /**
274    * Read buffer size, in bytes.
275    *
276    * Defaults to 1 MiB.
277    */
278   size_t readBufferSize() @property const {
279     return readBufferSize_;
280   }
281 
282   /// ditto
283   void readBufferSize(size_t value) @property {
284     if (readBuffer_) {
285       enforce(value <= readBufferSize_,
286         "Cannot shrink read buffer after first read.");
287       readBuffer_.length = value;
288     }
289     readBufferSize_ = value;
290   }
291 
292   /// ditto
293   enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;
294 
295   /**
296    * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
297    *
298    * Defaults to zero (no limit).
299    */
300   size_t maxEventSize() @property const {
301     return maxEventSize_;
302   }
303 
304   /// ditto
305   void maxEventSize(size_t value) @property {
306     enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
307       "mutiple chunks, maxEventSize must be smaller than chunk size.");
308     maxEventSize_ = value;
309   }
310 
311   /// ditto
312   enum DEFAULT_MAX_EVENT_SIZE = 0;
313 
314   /**
315    * The interval at which the thread wakes up to check for the next chunk
316    * in tailing mode.
317    *
318    * Defaults to one second.
319    */
320   Duration corruptedEventSleepDuration() const {
321     return corruptedEventSleepDuration_;
322   }
323 
324   /// ditto
325   void corruptedEventSleepDuration(Duration value) {
326     corruptedEventSleepDuration_ = value;
327   }
328 
329   /// ditto
330   enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);
331 
332   /**
333    * The maximum number of corrupted events tolerated before the whole chunk
334    * is skipped.
335    *
336    * Defaults to zero.
337    */
338   uint maxCorruptedEvents() @property const {
339     return maxCorruptedEvents_;
340   }
341 
342   /// ditto
343   void maxCorruptedEvents(uint value) @property {
344     maxCorruptedEvents_ = value;
345   }
346 
347   /// ditto
348   enum DEFAULT_MAX_CORRUPTED_EVENTS = 0;
349 
350 private:
351   ubyte[] readEvent() {
352     if (!readBuffer_) {
353       readBuffer_ = new ubyte[readBufferSize_];
354     }
355 
356     bool timeoutExpired;
357     while (1) {
358       // read from the file if read buffer is exhausted
359       if (readState_.bufferPos_ == readState_.bufferLen_) {
360         // advance the offset pointer
361         offset_ += readState_.bufferLen_;
362 
363         try {
364           // Need to clear eof flag before reading, otherwise tailing a file
365           // does not work.
366           file_.clearerr();
367 
368           auto usedBuf = file_.rawRead(readBuffer_);
369           readState_.bufferLen_ = usedBuf.length;
370         } catch (Exception e) {
371           readState_.resetAllValues();
372           throw new TTransportException("Error while reading from file",
373             __FILE__, __LINE__, e);
374         }
375 
376         readState_.bufferPos_ = 0;
377         readState_.lastDispatchPos_ = 0;
378 
379         if (readState_.bufferLen_ == 0) {
380           // Reached end of file.
381           if (readTimeout_ < dur!"hnsecs"(0)) {
382             // Tailing mode, sleep for the specified duration and try again.
383             Thread.sleep(-readTimeout_);
384             continue;
385           } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
386             // Either no timeout set, or it has already expired.
387             readState_.resetState(0);
388             return null;
389           } else {
390             // Timeout mode, sleep for the specified amount of time and retry.
391             Thread.sleep(readTimeout_);
392             timeoutExpired = true;
393             continue;
394           }
395         }
396       }
397 
398       // Attempt to read an event from the buffer.
399       while (readState_.bufferPos_ < readState_.bufferLen_) {
400         if (readState_.readingSize_) {
401           if (readState_.eventSizeBuffPos_ == 0) {
402             if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
403               ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
404             {
405               readState_.bufferPos_++;
406               continue;
407             }
408           }
409 
410           readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
411             readBuffer_[readState_.bufferPos_++];
412 
413           if (readState_.eventSizeBuffPos_ == 4) {
414             auto size = (cast(uint[])readState_.eventSizeBuff_)[0];
415 
416             if (size == 0) {
417               // This is part of the zero padding between chunks.
418               readState_.resetState(readState_.lastDispatchPos_);
419               continue;
420             }
421 
422             // got a valid event
423             readState_.readingSize_ = false;
424             readState_.eventLen_ = size;
425             readState_.eventPos_ = 0;
426 
427             // check if the event is corrupted and perform recovery if required
428             if (isEventCorrupted()) {
429               performRecovery();
430               // start from the top
431               break;
432             }
433           }
434         } else {
435           if (!readState_.event_) {
436             readState_.event_ = new ubyte[readState_.eventLen_];
437           }
438 
439           // take either the entire event or the remaining bytes in the buffer
440           auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
441             readState_.eventLen_ - readState_.eventPos_);
442 
443           // copy data from read buffer into event buffer
444           readState_.event_[
445             readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
446           ] = readBuffer_[
447             readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
448           ];
449 
450           // increment position ptrs
451           readState_.eventPos_ += reclaimBuffer;
452           readState_.bufferPos_ += reclaimBuffer;
453 
454           // check if the event has been read in full
455           if (readState_.eventPos_ == readState_.eventLen_) {
456             // Reset the read state and return the completed event.
457             auto completeEvent = readState_.event_;
458             readState_.event_ = null;
459             readState_.resetState(readState_.bufferPos_);
460             return completeEvent;
461           }
462         }
463       }
464     }
465   }
466 
467   bool isEventCorrupted() {
468     if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
469       // Event size is larger than user-speficied max-event size
470       logError("Corrupt event read: Event size (%s) greater than max " ~
471         "event size (%s)", readState_.eventLen_, maxEventSize_);
472       return true;
473     } else if (readState_.eventLen_ > chunkSize_) {
474       // Event size is larger than chunk size
475       logError("Corrupt event read: Event size (%s) greater than chunk " ~
476         "size (%s)", readState_.eventLen_, chunkSize_);
477       return true;
478     } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
479       ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
480     {
481       // Size indicates that event crosses chunk boundary
482       logError("Read corrupt event. Event crosses chunk boundary. " ~
483         "Event size: %s. Offset: %s", readState_.eventLen_,
484         (offset_ + readState_.bufferPos_ + EventSize.sizeof)
485       );
486 
487       return true;
488     }
489 
490     return false;
491   }
492 
493   void performRecovery() {
494     // perform some kickass recovery
495     auto curChunk = getCurChunk();
496     if (lastBadChunk_ == curChunk) {
497       numCorruptedEventsInChunk_++;
498     } else {
499       lastBadChunk_ = curChunk;
500       numCorruptedEventsInChunk_ = 1;
501     }
502 
503     if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
504       // maybe there was an error in reading the file from disk
505       // seek to the beginning of chunk and try again
506       seekToChunk(curChunk);
507     } else {
508       // Just skip ahead to the next chunk if we not already at the last chunk.
509       if (curChunk != (getNumChunks() - 1)) {
510         seekToChunk(curChunk + 1);
511       } else if (readTimeout_ < dur!"hnsecs"(0)) {
512         // We are in tailing mode, wait until there is enough data to start
513         // the next chunk.
514         while(curChunk == (getNumChunks() - 1)) {
515           Thread.sleep(corruptedEventSleepDuration_);
516         }
517         seekToChunk(curChunk + 1);
518       } else {
519         // Pretty hosed at this stage, rewind the file back to the last
520         // successful point and punt on the error.
521         readState_.resetState(readState_.lastDispatchPos_);
522         currentEvent_ = null;
523         currentEventPos_ = 0;
524 
525         throw new TTransportException("File corrupted at offset: " ~
526           to!string(offset_ + readState_.lastDispatchPos_),
527           TTransportException.Type.CORRUPTED_DATA);
528       }
529     }
530   }
531 
532   string path_;
533   File file_;
534   bool isOpen_;
535   long offset_;
536   ubyte[] currentEvent_;
537   size_t currentEventPos_;
538   ulong chunkSize_;
539   Duration readTimeout_;
540   size_t maxEventSize_;
541 
542   // Read buffer – lazily allocated on the first read().
543   ubyte[] readBuffer_;
544   size_t readBufferSize_;
545 
546   static struct ReadState {
547     ubyte[] event_;
548     size_t eventLen_;
549     size_t eventPos_;
550 
551     // keep track of event size
552     ubyte[4] eventSizeBuff_;
553     ubyte eventSizeBuffPos_;
554     bool readingSize_ = true;
555 
556     // read buffer variables
557     size_t bufferPos_;
558     size_t bufferLen_;
559 
560     // last successful dispatch point
561     size_t lastDispatchPos_;
562 
563     void resetState(size_t lastDispatchPos) {
564       readingSize_ = true;
565       eventSizeBuffPos_ = 0;
566       lastDispatchPos_ = lastDispatchPos;
567     }
568 
569     void resetAllValues() {
570       resetState(0);
571       bufferPos_ = 0;
572       bufferLen_ = 0;
573       event_ = null;
574     }
575   }
576   ReadState readState_;
577 
578   ulong lastBadChunk_;
579   uint maxCorruptedEvents_;
580   uint numCorruptedEventsInChunk_;
581   Duration corruptedEventSleepDuration_;
582 }
583 
584 /**
585  * A transport used to write log files. It can never be read from, calling
586  * read() throws.
587  *
588  * Contrary to the C++ design, explicitly opening the transport/file before
589  * using is necessary to allow manually closing the file without relying on the
590  * object lifetime.
591  */
592 final class TFileWriterTransport : TBaseTransport {
593   /**
594    * Creates a new file writer transport.
595    *
596    * Params:
597    *   path = Path of the file to opperate on.
598    */
599   this(string path) {
600     path_ = path;
601 
602     chunkSize_ = DEFAULT_CHUNK_SIZE;
603     eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
604     ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
605     maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
606     maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
607   }
608 
609   override bool isOpen() @property {
610     return isOpen_;
611   }
612 
613   /**
614    * A file writer transport can never be read from.
615    */
616   override bool peek() {
617     return false;
618   }
619 
620   override void open() {
621     if (isOpen) return;
622 
623     writerThread_ = spawn(
624       &writerThread,
625       path_,
626       chunkSize_,
627       maxFlushBytes_,
628       maxFlushInterval_,
629       ioErrorSleepDuration_
630     );
631     setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
632     isOpen_ = true;
633   }
634 
635   /**
636    * Closes the transport, i.e. the underlying file and the writer thread.
637    */
638   override void close() {
639     if (!isOpen) return;
640 
641     send(writerThread_, ShutdownMessage(), thisTid);
642     receive((ShutdownMessage msg, Tid tid){});
643     isOpen_ = false;
644   }
645 
646   /**
647    * Enqueues the passed slice of data for writing and immediately returns.
648    * write() only blocks if the event buffer has been exhausted.
649    *
650    * The transport must be open when calling this.
651    *
652    * Params:
653    *   buf = Slice of data to write.
654    */
655   override void write(in ubyte[] buf) {
656     enforce(isOpen, new TTransportException(
657       "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));
658 
659     if (buf.empty) {
660       logError("Cannot write empty event, skipping.");
661       return;
662     }
663 
664     auto maxSize = chunkSize - EventSize.sizeof;
665     enforce(buf.length <= maxSize, new TTransportException(
666       "Cannot write more than " ~ to!string(maxSize) ~
667       "bytes at once due to chunk size."));
668 
669     send(writerThread_, buf.idup);
670   }
671 
672   /**
673    * Flushes any pending data to be written.
674    *
675    * The transport must be open when calling this.
676    *
677    * Throws: TTransportException if an error occurs.
678    */
679   override void flush() {
680     enforce(isOpen, new TTransportException(
681       "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));
682 
683     send(writerThread_, FlushMessage(), thisTid);
684     receive((FlushMessage msg, Tid tid){});
685   }
686 
687   /**
688    * The size of the chunks the file is divided into, in bytes.
689    *
690    * A single event (write call) never spans multiple chunks – this
691    * effectively limits the event size to chunkSize - EventSize.sizeof.
692    */
693   ulong chunkSize() @property {
694     return chunkSize_;
695   }
696 
697   /// ditto
698   void chunkSize(ulong value) @property {
699     enforce(!isOpen, new TTransportException(
700       "Cannot set chunk size after TFileWriterTransport has been opened."));
701     chunkSize_ = value;
702   }
703 
704   /**
705    * The maximum number of write() calls buffered, or zero for no limit.
706    *
707    * If the buffer is exhausted, write() will block until space becomes
708    * available.
709    */
710   size_t eventBufferSize() @property {
711     return eventBufferSize_;
712   }
713 
714   /// ditto
715   void eventBufferSize(size_t value) @property {
716     eventBufferSize_ = value;
717     if (isOpen) {
718       setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
719     }
720   }
721 
722   /// ditto
723   enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;
724 
725   /**
726    * Maximum number of bytes buffered before writing and flushing the file
727    * to disk.
728    *
729    * Currently cannot be set after the first call to write().
730    */
731   size_t maxFlushBytes() @property {
732     return maxFlushBytes_;
733   }
734 
735   /// ditto
736   void maxFlushBytes(size_t value) @property {
737     maxFlushBytes_ = value;
738     if (isOpen) {
739       send(writerThread_, FlushBytesMessage(value));
740     }
741   }
742 
743   /// ditto
744   enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;
745 
746   /**
747    * Maximum interval between flushing the file to disk.
748    *
749    * Currenlty cannot be set after the first call to write().
750    */
751   Duration maxFlushInterval() @property {
752     return maxFlushInterval_;
753   }
754 
755   /// ditto
756   void maxFlushInterval(Duration value) @property {
757     maxFlushInterval_ = value;
758     if (isOpen) {
759       send(writerThread_, FlushIntervalMessage(value));
760     }
761   }
762 
763   /// ditto
764   enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);
765 
766   /**
767    * When the writer thread encounteres an I/O error, it goes pauses for a
768    * short time before trying to reopen the output file. This controls the
769    * sleep duration.
770    */
771   Duration ioErrorSleepDuration() @property {
772     return ioErrorSleepDuration_;
773   }
774 
775   /// ditto
776   void ioErrorSleepDuration(Duration value) @property {
777     ioErrorSleepDuration_ = value;
778     if (isOpen) {
779       send(writerThread_, FlushIntervalMessage(value));
780     }
781   }
782 
783   /// ditto
784   enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);
785 
786 private:
787   string path_;
788   ulong chunkSize_;
789   size_t eventBufferSize_;
790   Duration ioErrorSleepDuration_;
791   size_t maxFlushBytes_;
792   Duration maxFlushInterval_;
793   bool isOpen_;
794   Tid writerThread_;
795 }
796 
797 private {
798   // Signals that the file should be flushed on disk. Sent to the writer
799   // thread and sent back along with the tid for confirmation.
800   struct FlushMessage {}
801 
802   // Signals that the writer thread should close the file and shut down. Sent
803   // to the writer thread and sent back along with the tid for confirmation.
804   struct ShutdownMessage {}
805 
806   struct FlushBytesMessage {
807     size_t value;
808   }
809 
810   struct FlushIntervalMessage {
811     Duration value;
812   }
813 
814   struct IoErrorSleepDurationMessage {
815     Duration value;
816   }
817 
818   void writerThread(
819     string path,
820     ulong chunkSize,
821     size_t maxFlushBytes,
822     Duration maxFlushInterval,
823     Duration ioErrorSleepDuration
824   ) {
825     bool errorOpening;
826     File file;
827     ulong offset;
828     try {
829       // Open file in appending and binary mode.
830       file = File(path, "ab");
831       offset = file.tell();
832     } catch (Exception e) {
833       logError("Error on opening output file in writer thread: %s", e);
834       errorOpening = true;
835     }
836 
837     auto flushTimer = StopWatch(AutoStart.yes);
838     size_t unflushedByteCount;
839 
840     Tid shutdownRequestTid;
841     bool shutdownRequested;
842     while (true) {
843       if (shutdownRequested) break;
844 
845       bool forceFlush;
846       Tid flushRequestTid;
847       receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
848         (immutable(ubyte)[] data) {
849           while (errorOpening) {
850             logError("Writer thread going to sleep for %s µs due to IO errors",
851               ioErrorSleepDuration.total!"usecs");
852 
853             // Sleep for ioErrorSleepDuration, being ready to be interrupted
854             // by shutdown requests.
855             auto timedOut = receiveTimeout(ioErrorSleepDuration,
856               (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
857             if (!timedOut) {
858               // We got a shutdown request, just drop all events and exit the
859               // main loop as to not block application shutdown with our tries
860               // which we must assume to fail.
861               break;
862             }
863 
864             try {
865               file = File(path, "ab");
866               unflushedByteCount = 0;
867               errorOpening = false;
868               logError("Output file %s reopened during writer thread error " ~
869                 "recovery", path);
870             } catch (Exception e) {
871               logError("Unable to reopen output file %s during writer " ~
872                 "thread error recovery", path);
873             }
874           }
875 
876           // Make sure the event does not cross the chunk boundary by writing
877           // a padding consisting of zeroes if it would.
878           auto chunk1 = offset / chunkSize;
879           auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;
880 
881           if (chunk1 != chunk2) {
882             // TODO: The C++ implementation refetches the offset here to »keep
883             // in sync« – why would this be needed?
884             auto padding = cast(size_t)
885               ((((offset / chunkSize) + 1) * chunkSize) - offset);
886             auto zeroes = new ubyte[padding];
887             file.rawWrite(zeroes);
888             unflushedByteCount += padding;
889             offset += padding;
890           }
891 
892           // TODO: 2 syscalls here, is this a problem performance-wise?
893           // Probably abysmal performance on Windows due to rawWrite
894           // implementation.
895           uint len = cast(uint)data.length;
896           file.rawWrite(cast(ubyte[])(&len)[0..1]);
897           file.rawWrite(data);
898 
899           auto bytesWritten = EventSize.sizeof + data.length;
900           unflushedByteCount += bytesWritten;
901           offset += bytesWritten;
902         }, (FlushBytesMessage msg) {
903           maxFlushBytes = msg.value;
904         }, (FlushIntervalMessage msg) {
905           maxFlushInterval = msg.value;
906         }, (IoErrorSleepDurationMessage msg) {
907           ioErrorSleepDuration = msg.value;
908         }, (FlushMessage msg, Tid tid) {
909           forceFlush = true;
910           flushRequestTid = tid;
911         }, (OwnerTerminated msg) {
912           shutdownRequested = true;
913         }, (ShutdownMessage msg, Tid tid) {
914           shutdownRequested = true;
915           shutdownRequestTid = tid;
916         }
917       );
918 
919       if (errorOpening) continue;
920 
921       bool flush;
922       if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
923         flush = true;
924       } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
925         if (unflushedByteCount == 0) {
926           // If the flush timer is due, but no data has been written, don't
927           // needlessly fsync, but do reset the timer.
928           flushTimer.reset();
929         } else {
930           flush = true;
931         }
932       }
933 
934       if (flush) {
935         file.flush();
936         flushTimer.reset();
937         unflushedByteCount = 0;
938         if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
939       }
940     }
941 
942     file.close();
943 
944     if (shutdownRequestTid != Tid.init) {
945       send(shutdownRequestTid, ShutdownMessage(), thisTid);
946     }
947   }
948 }
949 
950 version (unittest) {
951   import core.memory : GC;
952   import std.file;
953 }
954 
955 unittest {
956   void tryRemove(string fileName) {
957     try {
958       remove(fileName);
959     } catch (Exception) {}
960   }
961 
962   immutable fileName = "unittest.dat.tmp";
963   enforce(!exists(fileName), "Unit test output file " ~ fileName ~
964     " already exists.");
965 
966   /*
967    * Check the most basic reading/writing operations.
968    */
969   {
970     scope (exit) tryRemove(fileName);
971 
972     auto writer = new TFileWriterTransport(fileName);
973     writer.open();
974     scope (exit) writer.close();
975 
976     writer.write([1, 2]);
977     writer.write([3, 4]);
978     writer.write([5, 6, 7]);
979     writer.flush();
980 
981     auto reader = new TFileReaderTransport(fileName);
982     reader.open();
983     scope (exit) reader.close();
984 
985     auto buf = new ubyte[7];
986     reader.readAll(buf);
987     enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
988   }
989 
990   /*
991    * Check that chunking works as expected.
992    */
993   {
994     scope (exit) tryRemove(fileName);
995 
996     static assert(EventSize.sizeof == 4);
997     enum CHUNK_SIZE = 10;
998 
999     // Write some contents to the file.
1000     {
1001       auto writer = new TFileWriterTransport(fileName);
1002       writer.chunkSize = CHUNK_SIZE;
1003       writer.open();
1004       scope (exit) writer.close();
1005 
1006       writer.write([0xde]);
1007       writer.write([0xad]);
1008       // Chunk boundary here.
1009       writer.write([0xbe]);
1010       // The next write doesn't fit in the five bytes remaining, so we expect
1011       // padding zero bytes to be written.
1012       writer.write([0xef, 0x12]);
1013 
1014       try {
1015         writer.write(new ubyte[CHUNK_SIZE]);
1016         enforce(false, "Could write event not fitting in a single chunk.");
1017       } catch (TTransportException e) {}
1018 
1019       writer.flush();
1020     }
1021 
1022     // Check the raw contents of the file to see if chunk padding was written
1023     // as expected.
1024     auto file = File(fileName, "r");
1025     enforce(file.size == 26);
1026     auto written = new ubyte[26];
1027     file.rawRead(written);
1028     enforce(written == [
1029       1, 0, 0, 0, 0xde,
1030       1, 0, 0, 0, 0xad,
1031       1, 0, 0, 0, 0xbe,
1032       0, 0, 0, 0, 0,
1033       2, 0, 0, 0, 0xef, 0x12
1034     ]);
1035 
1036     // Read the data back in, getting all the events at once.
1037     {
1038       auto reader = new TFileReaderTransport(fileName);
1039       reader.chunkSize = CHUNK_SIZE;
1040       reader.open();
1041       scope (exit) reader.close();
1042 
1043       auto buf = new ubyte[5];
1044       reader.readAll(buf);
1045       enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
1046     }
1047   }
1048 
1049   /*
1050    * Make sure that close() exits "quickly", i.e. that there is no problem
1051    * with the worker thread waking up.
1052    */
1053   {
1054     import std.conv : text;
1055     enum NUM_ITERATIONS = 1000;
1056 
1057     uint numOver = 0;
1058     foreach (n; 0 .. NUM_ITERATIONS) {
1059       scope (exit) tryRemove(fileName);
1060 
1061       auto transport = new TFileWriterTransport(fileName);
1062       transport.open();
1063 
1064       // Write something so that the writer thread gets started.
1065       transport.write(cast(ubyte[])"foo");
1066 
1067       // Every other iteration, also call flush(), just in case that potentially
1068       // has any effect on how the writer thread wakes up.
1069       if (n & 0x1) {
1070         transport.flush();
1071       }
1072 
1073       // Time the call to close().
1074       auto sw = StopWatch(AutoStart.yes);
1075       transport.close();
1076       sw.stop();
1077 
1078       // If any attempt takes more than 500ms, treat that as a fatal failure to
1079       // avoid looping over a potentially very slow operation.
1080       enforce(sw.peek().total!"msecs" < 1500,
1081         text("close() took ", sw.peek().total!"msecs", "ms."));
1082 
1083       // Normally, it takes less than 5ms on my dev box.
1084       // However, if the box is heavily loaded, some of the test runs can take
1085       // longer. Additionally, on a Windows Server 2008 instance running in
1086       // a VirtualBox VM, it has been observed that about a quarter of the runs
1087       // takes (217 ± 1) ms, for reasons not yet known.
1088       if (sw.peek().total!"msecs" > 50) {
1089         ++numOver;
1090       }
1091 
1092       // Force garbage collection runs every now and then to make sure we
1093       // don't run out of OS thread handles.
1094       if (!(n % 100)) GC.collect();
1095     }
1096 
1097     // Make sure fewer than a third of the runs took longer than 5ms.
1098     enforce(numOver < NUM_ITERATIONS / 3,
1099       text(numOver, " iterations took more than 10 ms."));
1100   }
1101 }