1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
20 /**
21  * Transports for reading from/writing to Thrift »log files«.
22  *
23  * These transports are not »stupid« sources and sinks just reading and
24  * writing bytes from a file verbatim, but organize the contents in the form
25  * of so-called »events«, which refers to the data written between two flush()
26  * calls.
27  *
28  * Chunking is supported, events are guaranteed to never span chunk boundaries.
29  * As a consequence, an event can never be larger than the chunk size. The
30  * chunk size used is not saved with the file, so care has to be taken to make
31  * sure the same chunk size is used for reading and writing.
32  */
33 module thrift.transport.file;
35 import core.thread : Thread;
36 import std.array : empty;
37 import std.algorithm : min, max;
38 import std.concurrency;
39 import std.conv : to;
40 import std.datetime : dur, Duration;
41 import std.datetime.stopwatch : AutoStart, StopWatch;
42 import std.exception;
43 import std.stdio : File;
44 import thrift.base;
45 import thrift.transport.base;
47 /// The default chunk size, in bytes.
48 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
50 /// The type used to represent event sizes in the file.
51 alias uint EventSize;
53 version (BigEndian) {
54   static assert(false,
55     "Little endian byte order is assumed in thrift.transport.file.");
56 }
58 /**
59  * A transport used to read log files. It can never be written to, calling
60  * write() throws.
61  *
62  * Contrary to the C++ design, explicitly opening the transport/file before
63  * using is necessary to allow manually closing the file without relying on the
64  * object lifetime. Otherwise, it's a straight port of the C++ implementation.
65  */
66 final class TFileReaderTransport : TBaseTransport {
67   /**
68    * Creates a new file writer transport.
69    *
70    * Params:
71    *   path = Path of the file to opperate on.
72    */
73   this(string path) {
74     path_ = path;
75     chunkSize_ = DEFAULT_CHUNK_SIZE;
76     readBufferSize_ = DEFAULT_READ_BUFFER_SIZE;
77     readTimeout_ = DEFAULT_READ_TIMEOUT;
78     corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION;
79     maxEventSize = DEFAULT_MAX_EVENT_SIZE;
80   }
82   override bool isOpen() @property {
83     return isOpen_;
84   }
86   override bool peek() {
87     if (!isOpen) return false;
89     // If there is no event currently processed, try fetching one from the
90     // file.
91     if (!currentEvent_) {
92       currentEvent_ = readEvent();
94       if (!currentEvent_) {
95         // Still nothing there, couldn't read a new event.
96         return false;
97       }
98     }
99     // check if there is anything to read
100     return (currentEvent_.length - currentEventPos_) > 0;
101   }
103   override void open() {
104     if (isOpen) return;
105     try {
106       file_ = File(path_, "rb");
107     } catch (Exception e) {
108       throw new TTransportException("Error on opening input file.",
109         TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
110     }
111     isOpen_ = true;
112   }
114   override void close() {
115     if (!isOpen) return;
117     file_.close();
118     isOpen_ = false;
119     readState_.resetAllValues();
120   }
122   override size_t read(ubyte[] buf) {
123     enforce(isOpen, new TTransportException(
124       "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN));
126     // If there is no event currently processed, try fetching one from the
127     // file.
128     if (!currentEvent_) {
129       currentEvent_ = readEvent();
131       if (!currentEvent_) {
132         // Still nothing there, couldn't read a new event.
133         return 0;
134       }
135     }
137     auto len = buf.length;
138     auto remaining = currentEvent_.length - currentEventPos_;
140     if (remaining <= len) {
141       // If less than the requested length is available, read as much as
142       // possible.
143       buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $];
144       currentEvent_ = null;
145       currentEventPos_ = 0;
146       return remaining;
147     }
149     // There will still be data left in the buffer after reading, pass out len
150     // bytes.
151     buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len];
152     currentEventPos_ += len;
153     return len;
154   }
156   ulong getNumChunks() {
157     enforce(isOpen, new TTransportException(
158       "Cannot get number of chunks if file not open.",
159       TTransportException.Type.NOT_OPEN));
161     try {
162       auto fileSize = file_.size();
163       if (fileSize == 0) {
164         // Empty files have no chunks.
165         return 0;
166       }
167       return ((fileSize)/chunkSize_) + 1;
168     } catch (Exception e) {
169       throw new TTransportException("Error getting file size.", __FILE__,
170         __LINE__, e);
171     }
172   }
174   ulong getCurChunk() {
175     return offset_ / chunkSize_;
176   }
178   void seekToChunk(long chunk) {
179     enforce(isOpen, new TTransportException(
180       "Cannot get number of chunks if file not open.",
181       TTransportException.Type.NOT_OPEN));
183     auto numChunks = getNumChunks();
185     if (chunk < 0) {
186       // Count negative indices from the end.
187       chunk += numChunks;
188     }
190     if (chunk < 0) {
191       logError("Incorrect chunk number for reverse seek, seeking to " ~
192        "beginning instead: %s", chunk);
193       chunk = 0;
194     }
196     bool seekToEnd;
197     long minEndOffset;
198     if (chunk >= numChunks) {
199       logError("Trying to seek to non-existing chunk, seeking to " ~
200        "end of file instead: %s", chunk);
201       seekToEnd = true;
202       chunk = numChunks - 1;
203       // this is the min offset to process events till
204       minEndOffset = file_.size();
205     }
207     readState_.resetAllValues();
208     currentEvent_ = null;
210     try {
211       file_.seek(chunk * chunkSize_);
212       offset_ = chunk * chunkSize_;
213     } catch (Exception e) {
214       throw new TTransportException("Error seeking to chunk", __FILE__,
215         __LINE__, e);
216     }
218     if (seekToEnd) {
219       // Never wait on the end of the file for new content, we just want to
220       // find the last one.
221       auto oldReadTimeout = readTimeout_;
222       scope (exit) readTimeout_ = oldReadTimeout;
223       readTimeout_ = dur!"hnsecs"(0);
225       // Keep on reading unti the last event at point of seekToChunk call.
226       while ((offset_ + readState_.bufferPos_) < minEndOffset) {
227         if (readEvent() is null) {
228           break;
229         }
230       }
231     }
232   }
234   void seekToEnd() {
235     seekToChunk(getNumChunks());
236   }
238   /**
239    * The size of the chunks the file is divided into, in bytes.
240    */
241   ulong chunkSize() @property const {
242     return chunkSize_;
243   }
245   /// ditto
246   void chunkSize(ulong value) @property {
247     enforce(!isOpen, new TTransportException(
248       "Cannot set chunk size after TFileReaderTransport has been opened."));
249     enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~
250       "be large enough to accommodate at least a single byte of payload data."));
251     chunkSize_ = value;
252   }
254   /**
255    * If positive, wait the specified duration for new data when arriving at
256    * end of file. If negative, wait forever (tailing mode), waking up to check
257    * in the specified interval. If zero, do not wait at all.
258    *
259    * Defaults to 500 ms.
260    */
261   Duration readTimeout() @property const {
262     return readTimeout_;
263   }
265   /// ditto
266   void readTimeout(Duration value) @property {
267     readTimeout_ = value;
268   }
270   /// ditto
271   enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500);
273   /**
274    * Read buffer size, in bytes.
275    *
276    * Defaults to 1 MiB.
277    */
278   size_t readBufferSize() @property const {
279     return readBufferSize_;
280   }
282   /// ditto
283   void readBufferSize(size_t value) @property {
284     if (readBuffer_) {
285       enforce(value <= readBufferSize_,
286         "Cannot shrink read buffer after first read.");
287       readBuffer_.length = value;
288     }
289     readBufferSize_ = value;
290   }
292   /// ditto
293   enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024;
295   /**
296    * Arbitrary event size limit, in bytes. Must be smaller than chunk size.
297    *
298    * Defaults to zero (no limit).
299    */
300   size_t maxEventSize() @property const {
301     return maxEventSize_;
302   }
304   /// ditto
305   void maxEventSize(size_t value) @property {
306     enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~
307       "mutiple chunks, maxEventSize must be smaller than chunk size.");
308     maxEventSize_ = value;
309   }
311   /// ditto
312   enum DEFAULT_MAX_EVENT_SIZE = 0;
314   /**
315    * The interval at which the thread wakes up to check for the next chunk
316    * in tailing mode.
317    *
318    * Defaults to one second.
319    */
320   Duration corruptedEventSleepDuration() const {
321     return corruptedEventSleepDuration_;
322   }
324   /// ditto
325   void corruptedEventSleepDuration(Duration value) {
326     corruptedEventSleepDuration_ = value;
327   }
329   /// ditto
330   enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1);
332   /**
333    * The maximum number of corrupted events tolerated before the whole chunk
334    * is skipped.
335    *
336    * Defaults to zero.
337    */
338   uint maxCorruptedEvents() @property const {
339     return maxCorruptedEvents_;
340   }
342   /// ditto
343   void maxCorruptedEvents(uint value) @property {
344     maxCorruptedEvents_ = value;
345   }
347   /// ditto
350 private:
351   ubyte[] readEvent() {
352     if (!readBuffer_) {
353       readBuffer_ = new ubyte[readBufferSize_];
354     }
356     bool timeoutExpired;
357     while (1) {
358       // read from the file if read buffer is exhausted
359       if (readState_.bufferPos_ == readState_.bufferLen_) {
360         // advance the offset pointer
361         offset_ += readState_.bufferLen_;
363         try {
364           // Need to clear eof flag before reading, otherwise tailing a file
365           // does not work.
366           file_.clearerr();
368           auto usedBuf = file_.rawRead(readBuffer_);
369           readState_.bufferLen_ = usedBuf.length;
370         } catch (Exception e) {
371           readState_.resetAllValues();
372           throw new TTransportException("Error while reading from file",
373             __FILE__, __LINE__, e);
374         }
376         readState_.bufferPos_ = 0;
377         readState_.lastDispatchPos_ = 0;
379         if (readState_.bufferLen_ == 0) {
380           // Reached end of file.
381           if (readTimeout_ < dur!"hnsecs"(0)) {
382             // Tailing mode, sleep for the specified duration and try again.
383             Thread.sleep(-readTimeout_);
384             continue;
385           } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) {
386             // Either no timeout set, or it has already expired.
387             readState_.resetState(0);
388             return null;
389           } else {
390             // Timeout mode, sleep for the specified amount of time and retry.
391             Thread.sleep(readTimeout_);
392             timeoutExpired = true;
393             continue;
394           }
395         }
396       }
398       // Attempt to read an event from the buffer.
399       while (readState_.bufferPos_ < readState_.bufferLen_) {
400         if (readState_.readingSize_) {
401           if (readState_.eventSizeBuffPos_ == 0) {
402             if ((offset_ + readState_.bufferPos_)/chunkSize_ !=
403               ((offset_ + readState_.bufferPos_ + 3)/chunkSize_))
404             {
405               readState_.bufferPos_++;
406               continue;
407             }
408           }
410           readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
411             readBuffer_[readState_.bufferPos_++];
413           if (readState_.eventSizeBuffPos_ == 4) {
414             auto size = (cast(uint[])readState_.eventSizeBuff_)[0];
416             if (size == 0) {
417               // This is part of the zero padding between chunks.
418               readState_.resetState(readState_.lastDispatchPos_);
419               continue;
420             }
422             // got a valid event
423             readState_.readingSize_ = false;
424             readState_.eventLen_ = size;
425             readState_.eventPos_ = 0;
427             // check if the event is corrupted and perform recovery if required
428             if (isEventCorrupted()) {
429               performRecovery();
430               // start from the top
431               break;
432             }
433           }
434         } else {
435           if (!readState_.event_) {
436             readState_.event_ = new ubyte[readState_.eventLen_];
437           }
439           // take either the entire event or the remaining bytes in the buffer
440           auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_,
441             readState_.eventLen_ - readState_.eventPos_);
443           // copy data from read buffer into event buffer
444           readState_.event_[
445             readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer
446           ] = readBuffer_[
447             readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer
448           ];
450           // increment position ptrs
451           readState_.eventPos_ += reclaimBuffer;
452           readState_.bufferPos_ += reclaimBuffer;
454           // check if the event has been read in full
455           if (readState_.eventPos_ == readState_.eventLen_) {
456             // Reset the read state and return the completed event.
457             auto completeEvent = readState_.event_;
458             readState_.event_ = null;
459             readState_.resetState(readState_.bufferPos_);
460             return completeEvent;
461           }
462         }
463       }
464     }
465   }
467   bool isEventCorrupted() {
468     if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) {
469       // Event size is larger than user-speficied max-event size
470       logError("Corrupt event read: Event size (%s) greater than max " ~
471         "event size (%s)", readState_.eventLen_, maxEventSize_);
472       return true;
473     } else if (readState_.eventLen_ > chunkSize_) {
474       // Event size is larger than chunk size
475       logError("Corrupt event read: Event size (%s) greater than chunk " ~
476         "size (%s)", readState_.eventLen_, chunkSize_);
477       return true;
478     } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) !=
479       ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_))
480     {
481       // Size indicates that event crosses chunk boundary
482       logError("Read corrupt event. Event crosses chunk boundary. " ~
483         "Event size: %s. Offset: %s", readState_.eventLen_,
484         (offset_ + readState_.bufferPos_ + EventSize.sizeof)
485       );
487       return true;
488     }
490     return false;
491   }
493   void performRecovery() {
494     // perform some kickass recovery
495     auto curChunk = getCurChunk();
496     if (lastBadChunk_ == curChunk) {
497       numCorruptedEventsInChunk_++;
498     } else {
499       lastBadChunk_ = curChunk;
500       numCorruptedEventsInChunk_ = 1;
501     }
503     if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
504       // maybe there was an error in reading the file from disk
505       // seek to the beginning of chunk and try again
506       seekToChunk(curChunk);
507     } else {
508       // Just skip ahead to the next chunk if we not already at the last chunk.
509       if (curChunk != (getNumChunks() - 1)) {
510         seekToChunk(curChunk + 1);
511       } else if (readTimeout_ < dur!"hnsecs"(0)) {
512         // We are in tailing mode, wait until there is enough data to start
513         // the next chunk.
514         while(curChunk == (getNumChunks() - 1)) {
515           Thread.sleep(corruptedEventSleepDuration_);
516         }
517         seekToChunk(curChunk + 1);
518       } else {
519         // Pretty hosed at this stage, rewind the file back to the last
520         // successful point and punt on the error.
521         readState_.resetState(readState_.lastDispatchPos_);
522         currentEvent_ = null;
523         currentEventPos_ = 0;
525         throw new TTransportException("File corrupted at offset: " ~
526           to!string(offset_ + readState_.lastDispatchPos_),
527           TTransportException.Type.CORRUPTED_DATA);
528       }
529     }
530   }
532   string path_;
533   File file_;
534   bool isOpen_;
535   long offset_;
536   ubyte[] currentEvent_;
537   size_t currentEventPos_;
538   ulong chunkSize_;
539   Duration readTimeout_;
540   size_t maxEventSize_;
542   // Read buffer – lazily allocated on the first read().
543   ubyte[] readBuffer_;
544   size_t readBufferSize_;
546   static struct ReadState {
547     ubyte[] event_;
548     size_t eventLen_;
549     size_t eventPos_;
551     // keep track of event size
552     ubyte[4] eventSizeBuff_;
553     ubyte eventSizeBuffPos_;
554     bool readingSize_ = true;
556     // read buffer variables
557     size_t bufferPos_;
558     size_t bufferLen_;
560     // last successful dispatch point
561     size_t lastDispatchPos_;
563     void resetState(size_t lastDispatchPos) {
564       readingSize_ = true;
565       eventSizeBuffPos_ = 0;
566       lastDispatchPos_ = lastDispatchPos;
567     }
569     void resetAllValues() {
570       resetState(0);
571       bufferPos_ = 0;
572       bufferLen_ = 0;
573       event_ = null;
574     }
575   }
576   ReadState readState_;
578   ulong lastBadChunk_;
579   uint maxCorruptedEvents_;
580   uint numCorruptedEventsInChunk_;
581   Duration corruptedEventSleepDuration_;
582 }
584 /**
585  * A transport used to write log files. It can never be read from, calling
586  * read() throws.
587  *
588  * Contrary to the C++ design, explicitly opening the transport/file before
589  * using is necessary to allow manually closing the file without relying on the
590  * object lifetime.
591  */
592 final class TFileWriterTransport : TBaseTransport {
593   /**
594    * Creates a new file writer transport.
595    *
596    * Params:
597    *   path = Path of the file to opperate on.
598    */
599   this(string path) {
600     path_ = path;
602     chunkSize_ = DEFAULT_CHUNK_SIZE;
603     eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE;
604     ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION;
605     maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES;
606     maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL;
607   }
609   override bool isOpen() @property {
610     return isOpen_;
611   }
613   /**
614    * A file writer transport can never be read from.
615    */
616   override bool peek() {
617     return false;
618   }
620   override void open() {
621     if (isOpen) return;
623     writerThread_ = spawn(
624       &writerThread,
625       path_,
626       chunkSize_,
627       maxFlushBytes_,
628       maxFlushInterval_,
629       ioErrorSleepDuration_
630     );
631     setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block);
632     isOpen_ = true;
633   }
635   /**
636    * Closes the transport, i.e. the underlying file and the writer thread.
637    */
638   override void close() {
639     if (!isOpen) return;
641     send(writerThread_, ShutdownMessage(), thisTid);
642     receive((ShutdownMessage msg, Tid tid){});
643     isOpen_ = false;
644   }
646   /**
647    * Enqueues the passed slice of data for writing and immediately returns.
648    * write() only blocks if the event buffer has been exhausted.
649    *
650    * The transport must be open when calling this.
651    *
652    * Params:
653    *   buf = Slice of data to write.
654    */
655   override void write(in ubyte[] buf) {
656     enforce(isOpen, new TTransportException(
657       "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN));
659     if (buf.empty) {
660       logError("Cannot write empty event, skipping.");
661       return;
662     }
664     auto maxSize = chunkSize - EventSize.sizeof;
665     enforce(buf.length <= maxSize, new TTransportException(
666       "Cannot write more than " ~ to!string(maxSize) ~
667       "bytes at once due to chunk size."));
669     send(writerThread_, buf.idup);
670   }
672   /**
673    * Flushes any pending data to be written.
674    *
675    * The transport must be open when calling this.
676    *
677    * Throws: TTransportException if an error occurs.
678    */
679   override void flush() {
680     enforce(isOpen, new TTransportException(
681       "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN));
683     send(writerThread_, FlushMessage(), thisTid);
684     receive((FlushMessage msg, Tid tid){});
685   }
687   /**
688    * The size of the chunks the file is divided into, in bytes.
689    *
690    * A single event (write call) never spans multiple chunks – this
691    * effectively limits the event size to chunkSize - EventSize.sizeof.
692    */
693   ulong chunkSize() @property {
694     return chunkSize_;
695   }
697   /// ditto
698   void chunkSize(ulong value) @property {
699     enforce(!isOpen, new TTransportException(
700       "Cannot set chunk size after TFileWriterTransport has been opened."));
701     chunkSize_ = value;
702   }
704   /**
705    * The maximum number of write() calls buffered, or zero for no limit.
706    *
707    * If the buffer is exhausted, write() will block until space becomes
708    * available.
709    */
710   size_t eventBufferSize() @property {
711     return eventBufferSize_;
712   }
714   /// ditto
715   void eventBufferSize(size_t value) @property {
716     eventBufferSize_ = value;
717     if (isOpen) {
718       setMaxMailboxSize(writerThread_, value, OnCrowding.throwException);
719     }
720   }
722   /// ditto
723   enum DEFAULT_EVENT_BUFFER_SIZE = 10_000;
725   /**
726    * Maximum number of bytes buffered before writing and flushing the file
727    * to disk.
728    *
729    * Currently cannot be set after the first call to write().
730    */
731   size_t maxFlushBytes() @property {
732     return maxFlushBytes_;
733   }
735   /// ditto
736   void maxFlushBytes(size_t value) @property {
737     maxFlushBytes_ = value;
738     if (isOpen) {
739       send(writerThread_, FlushBytesMessage(value));
740     }
741   }
743   /// ditto
744   enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024;
746   /**
747    * Maximum interval between flushing the file to disk.
748    *
749    * Currenlty cannot be set after the first call to write().
750    */
751   Duration maxFlushInterval() @property {
752     return maxFlushInterval_;
753   }
755   /// ditto
756   void maxFlushInterval(Duration value) @property {
757     maxFlushInterval_ = value;
758     if (isOpen) {
759       send(writerThread_, FlushIntervalMessage(value));
760     }
761   }
763   /// ditto
764   enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3);
766   /**
767    * When the writer thread encounteres an I/O error, it goes pauses for a
768    * short time before trying to reopen the output file. This controls the
769    * sleep duration.
770    */
771   Duration ioErrorSleepDuration() @property {
772     return ioErrorSleepDuration_;
773   }
775   /// ditto
776   void ioErrorSleepDuration(Duration value) @property {
777     ioErrorSleepDuration_ = value;
778     if (isOpen) {
779       send(writerThread_, FlushIntervalMessage(value));
780     }
781   }
783   /// ditto
784   enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500);
786 private:
787   string path_;
788   ulong chunkSize_;
789   size_t eventBufferSize_;
790   Duration ioErrorSleepDuration_;
791   size_t maxFlushBytes_;
792   Duration maxFlushInterval_;
793   bool isOpen_;
794   Tid writerThread_;
795 }
797 private {
798   // Signals that the file should be flushed on disk. Sent to the writer
799   // thread and sent back along with the tid for confirmation.
800   struct FlushMessage {}
802   // Signals that the writer thread should close the file and shut down. Sent
803   // to the writer thread and sent back along with the tid for confirmation.
804   struct ShutdownMessage {}
806   struct FlushBytesMessage {
807     size_t value;
808   }
810   struct FlushIntervalMessage {
811     Duration value;
812   }
814   struct IoErrorSleepDurationMessage {
815     Duration value;
816   }
818   void writerThread(
819     string path,
820     ulong chunkSize,
821     size_t maxFlushBytes,
822     Duration maxFlushInterval,
823     Duration ioErrorSleepDuration
824   ) {
825     bool errorOpening;
826     File file;
827     ulong offset;
828     try {
829       // Open file in appending and binary mode.
830       file = File(path, "ab");
831       offset = file.tell();
832     } catch (Exception e) {
833       logError("Error on opening output file in writer thread: %s", e);
834       errorOpening = true;
835     }
837     auto flushTimer = StopWatch(AutoStart.yes);
838     size_t unflushedByteCount;
840     Tid shutdownRequestTid;
841     bool shutdownRequested;
842     while (true) {
843       if (shutdownRequested) break;
845       bool forceFlush;
846       Tid flushRequestTid;
847       receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()),
848         (immutable(ubyte)[] data) {
849           while (errorOpening) {
850             logError("Writer thread going to sleep for %s µs due to IO errors",
851               ioErrorSleepDuration.total!"usecs");
853             // Sleep for ioErrorSleepDuration, being ready to be interrupted
854             // by shutdown requests.
855             auto timedOut = receiveTimeout(ioErrorSleepDuration,
856               (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; });
857             if (!timedOut) {
858               // We got a shutdown request, just drop all events and exit the
859               // main loop as to not block application shutdown with our tries
860               // which we must assume to fail.
861               break;
862             }
864             try {
865               file = File(path, "ab");
866               unflushedByteCount = 0;
867               errorOpening = false;
868               logError("Output file %s reopened during writer thread error " ~
869                 "recovery", path);
870             } catch (Exception e) {
871               logError("Unable to reopen output file %s during writer " ~
872                 "thread error recovery", path);
873             }
874           }
876           // Make sure the event does not cross the chunk boundary by writing
877           // a padding consisting of zeroes if it would.
878           auto chunk1 = offset / chunkSize;
879           auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize;
881           if (chunk1 != chunk2) {
882             // TODO: The C++ implementation refetches the offset here to »keep
883             // in sync« – why would this be needed?
884             auto padding = cast(size_t)
885               ((((offset / chunkSize) + 1) * chunkSize) - offset);
886             auto zeroes = new ubyte[padding];
887             file.rawWrite(zeroes);
888             unflushedByteCount += padding;
889             offset += padding;
890           }
892           // TODO: 2 syscalls here, is this a problem performance-wise?
893           // Probably abysmal performance on Windows due to rawWrite
894           // implementation.
895           uint len = cast(uint)data.length;
896           file.rawWrite(cast(ubyte[])(&len)[0..1]);
897           file.rawWrite(data);
899           auto bytesWritten = EventSize.sizeof + data.length;
900           unflushedByteCount += bytesWritten;
901           offset += bytesWritten;
902         }, (FlushBytesMessage msg) {
903           maxFlushBytes = msg.value;
904         }, (FlushIntervalMessage msg) {
905           maxFlushInterval = msg.value;
906         }, (IoErrorSleepDurationMessage msg) {
907           ioErrorSleepDuration = msg.value;
908         }, (FlushMessage msg, Tid tid) {
909           forceFlush = true;
910           flushRequestTid = tid;
911         }, (OwnerTerminated msg) {
912           shutdownRequested = true;
913         }, (ShutdownMessage msg, Tid tid) {
914           shutdownRequested = true;
915           shutdownRequestTid = tid;
916         }
917       );
919       if (errorOpening) continue;
921       bool flush;
922       if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) {
923         flush = true;
924       } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) {
925         if (unflushedByteCount == 0) {
926           // If the flush timer is due, but no data has been written, don't
927           // needlessly fsync, but do reset the timer.
928           flushTimer.reset();
929         } else {
930           flush = true;
931         }
932       }
934       if (flush) {
935         file.flush();
936         flushTimer.reset();
937         unflushedByteCount = 0;
938         if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid);
939       }
940     }
942     file.close();
944     if (shutdownRequestTid != Tid.init) {
945       send(shutdownRequestTid, ShutdownMessage(), thisTid);
946     }
947   }
948 }
950 version (unittest) {
951   import core.memory : GC;
952   import std.file;
953 }
955 unittest {
956   void tryRemove(string fileName) {
957     try {
958       remove(fileName);
959     } catch (Exception) {}
960   }
962   immutable fileName = "unittest.dat.tmp";
963   enforce(!exists(fileName), "Unit test output file " ~ fileName ~
964     " already exists.");
966   /*
967    * Check the most basic reading/writing operations.
968    */
969   {
970     scope (exit) tryRemove(fileName);
972     auto writer = new TFileWriterTransport(fileName);
973     writer.open();
974     scope (exit) writer.close();
976     writer.write([1, 2]);
977     writer.write([3, 4]);
978     writer.write([5, 6, 7]);
979     writer.flush();
981     auto reader = new TFileReaderTransport(fileName);
982     reader.open();
983     scope (exit) reader.close();
985     auto buf = new ubyte[7];
986     reader.readAll(buf);
987     enforce(buf == [1, 2, 3, 4, 5, 6, 7]);
988   }
990   /*
991    * Check that chunking works as expected.
992    */
993   {
994     scope (exit) tryRemove(fileName);
996     static assert(EventSize.sizeof == 4);
997     enum CHUNK_SIZE = 10;
999     // Write some contents to the file.
1000     {
1001       auto writer = new TFileWriterTransport(fileName);
1002       writer.chunkSize = CHUNK_SIZE;
1003       writer.open();
1004       scope (exit) writer.close();
1006       writer.write([0xde]);
1007       writer.write([0xad]);
1008       // Chunk boundary here.
1009       writer.write([0xbe]);
1010       // The next write doesn't fit in the five bytes remaining, so we expect
1011       // padding zero bytes to be written.
1012       writer.write([0xef, 0x12]);
1014       try {
1015         writer.write(new ubyte[CHUNK_SIZE]);
1016         enforce(false, "Could write event not fitting in a single chunk.");
1017       } catch (TTransportException e) {}
1019       writer.flush();
1020     }
1022     // Check the raw contents of the file to see if chunk padding was written
1023     // as expected.
1024     auto file = File(fileName, "r");
1025     enforce(file.size == 26);
1026     auto written = new ubyte[26];
1027     file.rawRead(written);
1028     enforce(written == [
1029       1, 0, 0, 0, 0xde,
1030       1, 0, 0, 0, 0xad,
1031       1, 0, 0, 0, 0xbe,
1032       0, 0, 0, 0, 0,
1033       2, 0, 0, 0, 0xef, 0x12
1034     ]);
1036     // Read the data back in, getting all the events at once.
1037     {
1038       auto reader = new TFileReaderTransport(fileName);
1039       reader.chunkSize = CHUNK_SIZE;
1040       reader.open();
1041       scope (exit) reader.close();
1043       auto buf = new ubyte[5];
1044       reader.readAll(buf);
1045       enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]);
1046     }
1047   }
1049   /*
1050    * Make sure that close() exits "quickly", i.e. that there is no problem
1051    * with the worker thread waking up.
1052    */
1053   {
1054     import std.conv : text;
1055     enum NUM_ITERATIONS = 1000;
1057     uint numOver = 0;
1058     foreach (n; 0 .. NUM_ITERATIONS) {
1059       scope (exit) tryRemove(fileName);
1061       auto transport = new TFileWriterTransport(fileName);
1062       transport.open();
1064       // Write something so that the writer thread gets started.
1065       transport.write(cast(ubyte[])"foo");
1067       // Every other iteration, also call flush(), just in case that potentially
1068       // has any effect on how the writer thread wakes up.
1069       if (n & 0x1) {
1070         transport.flush();
1071       }
1073       // Time the call to close().
1074       auto sw = StopWatch(AutoStart.yes);
1075       transport.close();
1076       sw.stop();
1078       // If any attempt takes more than 500ms, treat that as a fatal failure to
1079       // avoid looping over a potentially very slow operation.
1080       enforce(sw.peek().total!"msecs" < 1500,
1081         text("close() took ", sw.peek().total!"msecs", "ms."));
1083       // Normally, it takes less than 5ms on my dev box.
1084       // However, if the box is heavily loaded, some of the test runs can take
1085       // longer. Additionally, on a Windows Server 2008 instance running in
1086       // a VirtualBox VM, it has been observed that about a quarter of the runs
1087       // takes (217 ± 1) ms, for reasons not yet known.
1088       if (sw.peek().total!"msecs" > 50) {
1089         ++numOver;
1090       }
1092       // Force garbage collection runs every now and then to make sure we
1093       // don't run out of OS thread handles.
1094       if (!(n % 100)) GC.collect();
1095     }
1097     // Make sure fewer than a third of the runs took longer than 5ms.
1098     enforce(numOver < NUM_ITERATIONS / 3,
1099       text(numOver, " iterations took more than 10 ms."));
1100   }
1101 }