1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * Transports for reading from/writing to Thrift »log files«. 22 * 23 * These transports are not »stupid« sources and sinks just reading and 24 * writing bytes from a file verbatim, but organize the contents in the form 25 * of so-called »events«, which refers to the data written between two flush() 26 * calls. 27 * 28 * Chunking is supported, events are guaranteed to never span chunk boundaries. 29 * As a consequence, an event can never be larger than the chunk size. The 30 * chunk size used is not saved with the file, so care has to be taken to make 31 * sure the same chunk size is used for reading and writing. 32 */ 33 module thrift.transport.file; 34 35 import core.thread : Thread; 36 import std.array : empty; 37 import std.algorithm : min, max; 38 import std.concurrency; 39 import std.conv : to; 40 import std.datetime : dur, Duration; 41 import std.datetime.stopwatch : AutoStart, StopWatch; 42 import std.exception; 43 import std.stdio : File; 44 import thrift.base; 45 import thrift.transport.base; 46 47 /// The default chunk size, in bytes. 48 enum DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; 49 50 /// The type used to represent event sizes in the file. 51 alias uint EventSize; 52 53 version (BigEndian) { 54 static assert(false, 55 "Little endian byte order is assumed in thrift.transport.file."); 56 } 57 58 /** 59 * A transport used to read log files. It can never be written to, calling 60 * write() throws. 61 * 62 * Contrary to the C++ design, explicitly opening the transport/file before 63 * using is necessary to allow manually closing the file without relying on the 64 * object lifetime. Otherwise, it's a straight port of the C++ implementation. 65 */ 66 final class TFileReaderTransport : TBaseTransport { 67 /** 68 * Creates a new file writer transport. 69 * 70 * Params: 71 * path = Path of the file to opperate on. 72 */ 73 this(string path) { 74 path_ = path; 75 chunkSize_ = DEFAULT_CHUNK_SIZE; 76 readBufferSize_ = DEFAULT_READ_BUFFER_SIZE; 77 readTimeout_ = DEFAULT_READ_TIMEOUT; 78 corruptedEventSleepDuration_ = DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION; 79 maxEventSize = DEFAULT_MAX_EVENT_SIZE; 80 } 81 82 override bool isOpen() @property { 83 return isOpen_; 84 } 85 86 override bool peek() { 87 if (!isOpen) return false; 88 89 // If there is no event currently processed, try fetching one from the 90 // file. 91 if (!currentEvent_) { 92 currentEvent_ = readEvent(); 93 94 if (!currentEvent_) { 95 // Still nothing there, couldn't read a new event. 96 return false; 97 } 98 } 99 // check if there is anything to read 100 return (currentEvent_.length - currentEventPos_) > 0; 101 } 102 103 override void open() { 104 if (isOpen) return; 105 try { 106 file_ = File(path_, "rb"); 107 } catch (Exception e) { 108 throw new TTransportException("Error on opening input file.", 109 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); 110 } 111 isOpen_ = true; 112 } 113 114 override void close() { 115 if (!isOpen) return; 116 117 file_.close(); 118 isOpen_ = false; 119 readState_.resetAllValues(); 120 } 121 122 override size_t read(ubyte[] buf) { 123 enforce(isOpen, new TTransportException( 124 "Cannot read if file is not open.", TTransportException.Type.NOT_OPEN)); 125 126 // If there is no event currently processed, try fetching one from the 127 // file. 128 if (!currentEvent_) { 129 currentEvent_ = readEvent(); 130 131 if (!currentEvent_) { 132 // Still nothing there, couldn't read a new event. 133 return 0; 134 } 135 } 136 137 auto len = buf.length; 138 auto remaining = currentEvent_.length - currentEventPos_; 139 140 if (remaining <= len) { 141 // If less than the requested length is available, read as much as 142 // possible. 143 buf[0 .. remaining] = currentEvent_[currentEventPos_ .. $]; 144 currentEvent_ = null; 145 currentEventPos_ = 0; 146 return remaining; 147 } 148 149 // There will still be data left in the buffer after reading, pass out len 150 // bytes. 151 buf[] = currentEvent_[currentEventPos_ .. currentEventPos_ + len]; 152 currentEventPos_ += len; 153 return len; 154 } 155 156 ulong getNumChunks() { 157 enforce(isOpen, new TTransportException( 158 "Cannot get number of chunks if file not open.", 159 TTransportException.Type.NOT_OPEN)); 160 161 try { 162 auto fileSize = file_.size(); 163 if (fileSize == 0) { 164 // Empty files have no chunks. 165 return 0; 166 } 167 return ((fileSize)/chunkSize_) + 1; 168 } catch (Exception e) { 169 throw new TTransportException("Error getting file size.", __FILE__, 170 __LINE__, e); 171 } 172 } 173 174 ulong getCurChunk() { 175 return offset_ / chunkSize_; 176 } 177 178 void seekToChunk(long chunk) { 179 enforce(isOpen, new TTransportException( 180 "Cannot get number of chunks if file not open.", 181 TTransportException.Type.NOT_OPEN)); 182 183 auto numChunks = getNumChunks(); 184 185 if (chunk < 0) { 186 // Count negative indices from the end. 187 chunk += numChunks; 188 } 189 190 if (chunk < 0) { 191 logError("Incorrect chunk number for reverse seek, seeking to " ~ 192 "beginning instead: %s", chunk); 193 chunk = 0; 194 } 195 196 bool seekToEnd; 197 long minEndOffset; 198 if (chunk >= numChunks) { 199 logError("Trying to seek to non-existing chunk, seeking to " ~ 200 "end of file instead: %s", chunk); 201 seekToEnd = true; 202 chunk = numChunks - 1; 203 // this is the min offset to process events till 204 minEndOffset = file_.size(); 205 } 206 207 readState_.resetAllValues(); 208 currentEvent_ = null; 209 210 try { 211 file_.seek(chunk * chunkSize_); 212 offset_ = chunk * chunkSize_; 213 } catch (Exception e) { 214 throw new TTransportException("Error seeking to chunk", __FILE__, 215 __LINE__, e); 216 } 217 218 if (seekToEnd) { 219 // Never wait on the end of the file for new content, we just want to 220 // find the last one. 221 auto oldReadTimeout = readTimeout_; 222 scope (exit) readTimeout_ = oldReadTimeout; 223 readTimeout_ = dur!"hnsecs"(0); 224 225 // Keep on reading unti the last event at point of seekToChunk call. 226 while ((offset_ + readState_.bufferPos_) < minEndOffset) { 227 if (readEvent() is null) { 228 break; 229 } 230 } 231 } 232 } 233 234 void seekToEnd() { 235 seekToChunk(getNumChunks()); 236 } 237 238 /** 239 * The size of the chunks the file is divided into, in bytes. 240 */ 241 ulong chunkSize() @property const { 242 return chunkSize_; 243 } 244 245 /// ditto 246 void chunkSize(ulong value) @property { 247 enforce(!isOpen, new TTransportException( 248 "Cannot set chunk size after TFileReaderTransport has been opened.")); 249 enforce(value > EventSize.sizeof, new TTransportException("Chunks must " ~ 250 "be large enough to accommodate at least a single byte of payload data.")); 251 chunkSize_ = value; 252 } 253 254 /** 255 * If positive, wait the specified duration for new data when arriving at 256 * end of file. If negative, wait forever (tailing mode), waking up to check 257 * in the specified interval. If zero, do not wait at all. 258 * 259 * Defaults to 500 ms. 260 */ 261 Duration readTimeout() @property const { 262 return readTimeout_; 263 } 264 265 /// ditto 266 void readTimeout(Duration value) @property { 267 readTimeout_ = value; 268 } 269 270 /// ditto 271 enum DEFAULT_READ_TIMEOUT = dur!"msecs"(500); 272 273 /** 274 * Read buffer size, in bytes. 275 * 276 * Defaults to 1 MiB. 277 */ 278 size_t readBufferSize() @property const { 279 return readBufferSize_; 280 } 281 282 /// ditto 283 void readBufferSize(size_t value) @property { 284 if (readBuffer_) { 285 enforce(value <= readBufferSize_, 286 "Cannot shrink read buffer after first read."); 287 readBuffer_.length = value; 288 } 289 readBufferSize_ = value; 290 } 291 292 /// ditto 293 enum DEFAULT_READ_BUFFER_SIZE = 1 * 1024 * 1024; 294 295 /** 296 * Arbitrary event size limit, in bytes. Must be smaller than chunk size. 297 * 298 * Defaults to zero (no limit). 299 */ 300 size_t maxEventSize() @property const { 301 return maxEventSize_; 302 } 303 304 /// ditto 305 void maxEventSize(size_t value) @property { 306 enforce(value <= chunkSize_ - EventSize.sizeof, "Events cannot span " ~ 307 "mutiple chunks, maxEventSize must be smaller than chunk size."); 308 maxEventSize_ = value; 309 } 310 311 /// ditto 312 enum DEFAULT_MAX_EVENT_SIZE = 0; 313 314 /** 315 * The interval at which the thread wakes up to check for the next chunk 316 * in tailing mode. 317 * 318 * Defaults to one second. 319 */ 320 Duration corruptedEventSleepDuration() const { 321 return corruptedEventSleepDuration_; 322 } 323 324 /// ditto 325 void corruptedEventSleepDuration(Duration value) { 326 corruptedEventSleepDuration_ = value; 327 } 328 329 /// ditto 330 enum DEFAULT_CORRUPTED_EVENT_SLEEP_DURATION = dur!"seconds"(1); 331 332 /** 333 * The maximum number of corrupted events tolerated before the whole chunk 334 * is skipped. 335 * 336 * Defaults to zero. 337 */ 338 uint maxCorruptedEvents() @property const { 339 return maxCorruptedEvents_; 340 } 341 342 /// ditto 343 void maxCorruptedEvents(uint value) @property { 344 maxCorruptedEvents_ = value; 345 } 346 347 /// ditto 348 enum DEFAULT_MAX_CORRUPTED_EVENTS = 0; 349 350 private: 351 ubyte[] readEvent() { 352 if (!readBuffer_) { 353 readBuffer_ = new ubyte[readBufferSize_]; 354 } 355 356 bool timeoutExpired; 357 while (1) { 358 // read from the file if read buffer is exhausted 359 if (readState_.bufferPos_ == readState_.bufferLen_) { 360 // advance the offset pointer 361 offset_ += readState_.bufferLen_; 362 363 try { 364 // Need to clear eof flag before reading, otherwise tailing a file 365 // does not work. 366 file_.clearerr(); 367 368 auto usedBuf = file_.rawRead(readBuffer_); 369 readState_.bufferLen_ = usedBuf.length; 370 } catch (Exception e) { 371 readState_.resetAllValues(); 372 throw new TTransportException("Error while reading from file", 373 __FILE__, __LINE__, e); 374 } 375 376 readState_.bufferPos_ = 0; 377 readState_.lastDispatchPos_ = 0; 378 379 if (readState_.bufferLen_ == 0) { 380 // Reached end of file. 381 if (readTimeout_ < dur!"hnsecs"(0)) { 382 // Tailing mode, sleep for the specified duration and try again. 383 Thread.sleep(-readTimeout_); 384 continue; 385 } else if (readTimeout_ == dur!"hnsecs"(0) || timeoutExpired) { 386 // Either no timeout set, or it has already expired. 387 readState_.resetState(0); 388 return null; 389 } else { 390 // Timeout mode, sleep for the specified amount of time and retry. 391 Thread.sleep(readTimeout_); 392 timeoutExpired = true; 393 continue; 394 } 395 } 396 } 397 398 // Attempt to read an event from the buffer. 399 while (readState_.bufferPos_ < readState_.bufferLen_) { 400 if (readState_.readingSize_) { 401 if (readState_.eventSizeBuffPos_ == 0) { 402 if ((offset_ + readState_.bufferPos_)/chunkSize_ != 403 ((offset_ + readState_.bufferPos_ + 3)/chunkSize_)) 404 { 405 readState_.bufferPos_++; 406 continue; 407 } 408 } 409 410 readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = 411 readBuffer_[readState_.bufferPos_++]; 412 413 if (readState_.eventSizeBuffPos_ == 4) { 414 auto size = (cast(uint[])readState_.eventSizeBuff_)[0]; 415 416 if (size == 0) { 417 // This is part of the zero padding between chunks. 418 readState_.resetState(readState_.lastDispatchPos_); 419 continue; 420 } 421 422 // got a valid event 423 readState_.readingSize_ = false; 424 readState_.eventLen_ = size; 425 readState_.eventPos_ = 0; 426 427 // check if the event is corrupted and perform recovery if required 428 if (isEventCorrupted()) { 429 performRecovery(); 430 // start from the top 431 break; 432 } 433 } 434 } else { 435 if (!readState_.event_) { 436 readState_.event_ = new ubyte[readState_.eventLen_]; 437 } 438 439 // take either the entire event or the remaining bytes in the buffer 440 auto reclaimBuffer = min(readState_.bufferLen_ - readState_.bufferPos_, 441 readState_.eventLen_ - readState_.eventPos_); 442 443 // copy data from read buffer into event buffer 444 readState_.event_[ 445 readState_.eventPos_ .. readState_.eventPos_ + reclaimBuffer 446 ] = readBuffer_[ 447 readState_.bufferPos_ .. readState_.bufferPos_ + reclaimBuffer 448 ]; 449 450 // increment position ptrs 451 readState_.eventPos_ += reclaimBuffer; 452 readState_.bufferPos_ += reclaimBuffer; 453 454 // check if the event has been read in full 455 if (readState_.eventPos_ == readState_.eventLen_) { 456 // Reset the read state and return the completed event. 457 auto completeEvent = readState_.event_; 458 readState_.event_ = null; 459 readState_.resetState(readState_.bufferPos_); 460 return completeEvent; 461 } 462 } 463 } 464 } 465 } 466 467 bool isEventCorrupted() { 468 if ((maxEventSize_ > 0) && (readState_.eventLen_ > maxEventSize_)) { 469 // Event size is larger than user-speficied max-event size 470 logError("Corrupt event read: Event size (%s) greater than max " ~ 471 "event size (%s)", readState_.eventLen_, maxEventSize_); 472 return true; 473 } else if (readState_.eventLen_ > chunkSize_) { 474 // Event size is larger than chunk size 475 logError("Corrupt event read: Event size (%s) greater than chunk " ~ 476 "size (%s)", readState_.eventLen_, chunkSize_); 477 return true; 478 } else if (((offset_ + readState_.bufferPos_ - EventSize.sizeof) / chunkSize_) != 479 ((offset_ + readState_.bufferPos_ + readState_.eventLen_ - EventSize.sizeof) / chunkSize_)) 480 { 481 // Size indicates that event crosses chunk boundary 482 logError("Read corrupt event. Event crosses chunk boundary. " ~ 483 "Event size: %s. Offset: %s", readState_.eventLen_, 484 (offset_ + readState_.bufferPos_ + EventSize.sizeof) 485 ); 486 487 return true; 488 } 489 490 return false; 491 } 492 493 void performRecovery() { 494 // perform some kickass recovery 495 auto curChunk = getCurChunk(); 496 if (lastBadChunk_ == curChunk) { 497 numCorruptedEventsInChunk_++; 498 } else { 499 lastBadChunk_ = curChunk; 500 numCorruptedEventsInChunk_ = 1; 501 } 502 503 if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { 504 // maybe there was an error in reading the file from disk 505 // seek to the beginning of chunk and try again 506 seekToChunk(curChunk); 507 } else { 508 // Just skip ahead to the next chunk if we not already at the last chunk. 509 if (curChunk != (getNumChunks() - 1)) { 510 seekToChunk(curChunk + 1); 511 } else if (readTimeout_ < dur!"hnsecs"(0)) { 512 // We are in tailing mode, wait until there is enough data to start 513 // the next chunk. 514 while(curChunk == (getNumChunks() - 1)) { 515 Thread.sleep(corruptedEventSleepDuration_); 516 } 517 seekToChunk(curChunk + 1); 518 } else { 519 // Pretty hosed at this stage, rewind the file back to the last 520 // successful point and punt on the error. 521 readState_.resetState(readState_.lastDispatchPos_); 522 currentEvent_ = null; 523 currentEventPos_ = 0; 524 525 throw new TTransportException("File corrupted at offset: " ~ 526 to!string(offset_ + readState_.lastDispatchPos_), 527 TTransportException.Type.CORRUPTED_DATA); 528 } 529 } 530 } 531 532 string path_; 533 File file_; 534 bool isOpen_; 535 long offset_; 536 ubyte[] currentEvent_; 537 size_t currentEventPos_; 538 ulong chunkSize_; 539 Duration readTimeout_; 540 size_t maxEventSize_; 541 542 // Read buffer – lazily allocated on the first read(). 543 ubyte[] readBuffer_; 544 size_t readBufferSize_; 545 546 static struct ReadState { 547 ubyte[] event_; 548 size_t eventLen_; 549 size_t eventPos_; 550 551 // keep track of event size 552 ubyte[4] eventSizeBuff_; 553 ubyte eventSizeBuffPos_; 554 bool readingSize_ = true; 555 556 // read buffer variables 557 size_t bufferPos_; 558 size_t bufferLen_; 559 560 // last successful dispatch point 561 size_t lastDispatchPos_; 562 563 void resetState(size_t lastDispatchPos) { 564 readingSize_ = true; 565 eventSizeBuffPos_ = 0; 566 lastDispatchPos_ = lastDispatchPos; 567 } 568 569 void resetAllValues() { 570 resetState(0); 571 bufferPos_ = 0; 572 bufferLen_ = 0; 573 event_ = null; 574 } 575 } 576 ReadState readState_; 577 578 ulong lastBadChunk_; 579 uint maxCorruptedEvents_; 580 uint numCorruptedEventsInChunk_; 581 Duration corruptedEventSleepDuration_; 582 } 583 584 /** 585 * A transport used to write log files. It can never be read from, calling 586 * read() throws. 587 * 588 * Contrary to the C++ design, explicitly opening the transport/file before 589 * using is necessary to allow manually closing the file without relying on the 590 * object lifetime. 591 */ 592 final class TFileWriterTransport : TBaseTransport { 593 /** 594 * Creates a new file writer transport. 595 * 596 * Params: 597 * path = Path of the file to opperate on. 598 */ 599 this(string path) { 600 path_ = path; 601 602 chunkSize_ = DEFAULT_CHUNK_SIZE; 603 eventBufferSize_ = DEFAULT_EVENT_BUFFER_SIZE; 604 ioErrorSleepDuration = DEFAULT_IO_ERROR_SLEEP_DURATION; 605 maxFlushBytes_ = DEFAULT_MAX_FLUSH_BYTES; 606 maxFlushInterval_ = DEFAULT_MAX_FLUSH_INTERVAL; 607 } 608 609 override bool isOpen() @property { 610 return isOpen_; 611 } 612 613 /** 614 * A file writer transport can never be read from. 615 */ 616 override bool peek() { 617 return false; 618 } 619 620 override void open() { 621 if (isOpen) return; 622 623 writerThread_ = spawn( 624 &writerThread, 625 path_, 626 chunkSize_, 627 maxFlushBytes_, 628 maxFlushInterval_, 629 ioErrorSleepDuration_ 630 ); 631 setMaxMailboxSize(writerThread_, eventBufferSize_, OnCrowding.block); 632 isOpen_ = true; 633 } 634 635 /** 636 * Closes the transport, i.e. the underlying file and the writer thread. 637 */ 638 override void close() { 639 if (!isOpen) return; 640 641 send(writerThread_, ShutdownMessage(), thisTid); 642 receive((ShutdownMessage msg, Tid tid){}); 643 isOpen_ = false; 644 } 645 646 /** 647 * Enqueues the passed slice of data for writing and immediately returns. 648 * write() only blocks if the event buffer has been exhausted. 649 * 650 * The transport must be open when calling this. 651 * 652 * Params: 653 * buf = Slice of data to write. 654 */ 655 override void write(in ubyte[] buf) { 656 enforce(isOpen, new TTransportException( 657 "Cannot write to non-open file.", TTransportException.Type.NOT_OPEN)); 658 659 if (buf.empty) { 660 logError("Cannot write empty event, skipping."); 661 return; 662 } 663 664 auto maxSize = chunkSize - EventSize.sizeof; 665 enforce(buf.length <= maxSize, new TTransportException( 666 "Cannot write more than " ~ to!string(maxSize) ~ 667 "bytes at once due to chunk size.")); 668 669 send(writerThread_, buf.idup); 670 } 671 672 /** 673 * Flushes any pending data to be written. 674 * 675 * The transport must be open when calling this. 676 * 677 * Throws: TTransportException if an error occurs. 678 */ 679 override void flush() { 680 enforce(isOpen, new TTransportException( 681 "Cannot flush file if not open.", TTransportException.Type.NOT_OPEN)); 682 683 send(writerThread_, FlushMessage(), thisTid); 684 receive((FlushMessage msg, Tid tid){}); 685 } 686 687 /** 688 * The size of the chunks the file is divided into, in bytes. 689 * 690 * A single event (write call) never spans multiple chunks – this 691 * effectively limits the event size to chunkSize - EventSize.sizeof. 692 */ 693 ulong chunkSize() @property { 694 return chunkSize_; 695 } 696 697 /// ditto 698 void chunkSize(ulong value) @property { 699 enforce(!isOpen, new TTransportException( 700 "Cannot set chunk size after TFileWriterTransport has been opened.")); 701 chunkSize_ = value; 702 } 703 704 /** 705 * The maximum number of write() calls buffered, or zero for no limit. 706 * 707 * If the buffer is exhausted, write() will block until space becomes 708 * available. 709 */ 710 size_t eventBufferSize() @property { 711 return eventBufferSize_; 712 } 713 714 /// ditto 715 void eventBufferSize(size_t value) @property { 716 eventBufferSize_ = value; 717 if (isOpen) { 718 setMaxMailboxSize(writerThread_, value, OnCrowding.throwException); 719 } 720 } 721 722 /// ditto 723 enum DEFAULT_EVENT_BUFFER_SIZE = 10_000; 724 725 /** 726 * Maximum number of bytes buffered before writing and flushing the file 727 * to disk. 728 * 729 * Currently cannot be set after the first call to write(). 730 */ 731 size_t maxFlushBytes() @property { 732 return maxFlushBytes_; 733 } 734 735 /// ditto 736 void maxFlushBytes(size_t value) @property { 737 maxFlushBytes_ = value; 738 if (isOpen) { 739 send(writerThread_, FlushBytesMessage(value)); 740 } 741 } 742 743 /// ditto 744 enum DEFAULT_MAX_FLUSH_BYTES = 1000 * 1024; 745 746 /** 747 * Maximum interval between flushing the file to disk. 748 * 749 * Currenlty cannot be set after the first call to write(). 750 */ 751 Duration maxFlushInterval() @property { 752 return maxFlushInterval_; 753 } 754 755 /// ditto 756 void maxFlushInterval(Duration value) @property { 757 maxFlushInterval_ = value; 758 if (isOpen) { 759 send(writerThread_, FlushIntervalMessage(value)); 760 } 761 } 762 763 /// ditto 764 enum DEFAULT_MAX_FLUSH_INTERVAL = dur!"seconds"(3); 765 766 /** 767 * When the writer thread encounteres an I/O error, it goes pauses for a 768 * short time before trying to reopen the output file. This controls the 769 * sleep duration. 770 */ 771 Duration ioErrorSleepDuration() @property { 772 return ioErrorSleepDuration_; 773 } 774 775 /// ditto 776 void ioErrorSleepDuration(Duration value) @property { 777 ioErrorSleepDuration_ = value; 778 if (isOpen) { 779 send(writerThread_, FlushIntervalMessage(value)); 780 } 781 } 782 783 /// ditto 784 enum DEFAULT_IO_ERROR_SLEEP_DURATION = dur!"msecs"(500); 785 786 private: 787 string path_; 788 ulong chunkSize_; 789 size_t eventBufferSize_; 790 Duration ioErrorSleepDuration_; 791 size_t maxFlushBytes_; 792 Duration maxFlushInterval_; 793 bool isOpen_; 794 Tid writerThread_; 795 } 796 797 private { 798 // Signals that the file should be flushed on disk. Sent to the writer 799 // thread and sent back along with the tid for confirmation. 800 struct FlushMessage {} 801 802 // Signals that the writer thread should close the file and shut down. Sent 803 // to the writer thread and sent back along with the tid for confirmation. 804 struct ShutdownMessage {} 805 806 struct FlushBytesMessage { 807 size_t value; 808 } 809 810 struct FlushIntervalMessage { 811 Duration value; 812 } 813 814 struct IoErrorSleepDurationMessage { 815 Duration value; 816 } 817 818 void writerThread( 819 string path, 820 ulong chunkSize, 821 size_t maxFlushBytes, 822 Duration maxFlushInterval, 823 Duration ioErrorSleepDuration 824 ) { 825 bool errorOpening; 826 File file; 827 ulong offset; 828 try { 829 // Open file in appending and binary mode. 830 file = File(path, "ab"); 831 offset = file.tell(); 832 } catch (Exception e) { 833 logError("Error on opening output file in writer thread: %s", e); 834 errorOpening = true; 835 } 836 837 auto flushTimer = StopWatch(AutoStart.yes); 838 size_t unflushedByteCount; 839 840 Tid shutdownRequestTid; 841 bool shutdownRequested; 842 while (true) { 843 if (shutdownRequested) break; 844 845 bool forceFlush; 846 Tid flushRequestTid; 847 receiveTimeout(max(dur!"hnsecs"(0), maxFlushInterval - flushTimer.peek()), 848 (immutable(ubyte)[] data) { 849 while (errorOpening) { 850 logError("Writer thread going to sleep for %s µs due to IO errors", 851 ioErrorSleepDuration.total!"usecs"); 852 853 // Sleep for ioErrorSleepDuration, being ready to be interrupted 854 // by shutdown requests. 855 auto timedOut = receiveTimeout(ioErrorSleepDuration, 856 (ShutdownMessage msg, Tid tid){ shutdownRequestTid = tid; }); 857 if (!timedOut) { 858 // We got a shutdown request, just drop all events and exit the 859 // main loop as to not block application shutdown with our tries 860 // which we must assume to fail. 861 break; 862 } 863 864 try { 865 file = File(path, "ab"); 866 unflushedByteCount = 0; 867 errorOpening = false; 868 logError("Output file %s reopened during writer thread error " ~ 869 "recovery", path); 870 } catch (Exception e) { 871 logError("Unable to reopen output file %s during writer " ~ 872 "thread error recovery", path); 873 } 874 } 875 876 // Make sure the event does not cross the chunk boundary by writing 877 // a padding consisting of zeroes if it would. 878 auto chunk1 = offset / chunkSize; 879 auto chunk2 = (offset + EventSize.sizeof + data.length - 1) / chunkSize; 880 881 if (chunk1 != chunk2) { 882 // TODO: The C++ implementation refetches the offset here to »keep 883 // in sync« – why would this be needed? 884 auto padding = cast(size_t) 885 ((((offset / chunkSize) + 1) * chunkSize) - offset); 886 auto zeroes = new ubyte[padding]; 887 file.rawWrite(zeroes); 888 unflushedByteCount += padding; 889 offset += padding; 890 } 891 892 // TODO: 2 syscalls here, is this a problem performance-wise? 893 // Probably abysmal performance on Windows due to rawWrite 894 // implementation. 895 uint len = cast(uint)data.length; 896 file.rawWrite(cast(ubyte[])(&len)[0..1]); 897 file.rawWrite(data); 898 899 auto bytesWritten = EventSize.sizeof + data.length; 900 unflushedByteCount += bytesWritten; 901 offset += bytesWritten; 902 }, (FlushBytesMessage msg) { 903 maxFlushBytes = msg.value; 904 }, (FlushIntervalMessage msg) { 905 maxFlushInterval = msg.value; 906 }, (IoErrorSleepDurationMessage msg) { 907 ioErrorSleepDuration = msg.value; 908 }, (FlushMessage msg, Tid tid) { 909 forceFlush = true; 910 flushRequestTid = tid; 911 }, (OwnerTerminated msg) { 912 shutdownRequested = true; 913 }, (ShutdownMessage msg, Tid tid) { 914 shutdownRequested = true; 915 shutdownRequestTid = tid; 916 } 917 ); 918 919 if (errorOpening) continue; 920 921 bool flush; 922 if (forceFlush || shutdownRequested || unflushedByteCount > maxFlushBytes) { 923 flush = true; 924 } else if (cast(Duration)flushTimer.peek() > maxFlushInterval) { 925 if (unflushedByteCount == 0) { 926 // If the flush timer is due, but no data has been written, don't 927 // needlessly fsync, but do reset the timer. 928 flushTimer.reset(); 929 } else { 930 flush = true; 931 } 932 } 933 934 if (flush) { 935 file.flush(); 936 flushTimer.reset(); 937 unflushedByteCount = 0; 938 if (forceFlush) send(flushRequestTid, FlushMessage(), thisTid); 939 } 940 } 941 942 file.close(); 943 944 if (shutdownRequestTid != Tid.init) { 945 send(shutdownRequestTid, ShutdownMessage(), thisTid); 946 } 947 } 948 } 949 950 version (unittest) { 951 import core.memory : GC; 952 import std.file; 953 } 954 955 unittest { 956 void tryRemove(string fileName) { 957 try { 958 remove(fileName); 959 } catch (Exception) {} 960 } 961 962 immutable fileName = "unittest.dat.tmp"; 963 enforce(!exists(fileName), "Unit test output file " ~ fileName ~ 964 " already exists."); 965 966 /* 967 * Check the most basic reading/writing operations. 968 */ 969 { 970 scope (exit) tryRemove(fileName); 971 972 auto writer = new TFileWriterTransport(fileName); 973 writer.open(); 974 scope (exit) writer.close(); 975 976 writer.write([1, 2]); 977 writer.write([3, 4]); 978 writer.write([5, 6, 7]); 979 writer.flush(); 980 981 auto reader = new TFileReaderTransport(fileName); 982 reader.open(); 983 scope (exit) reader.close(); 984 985 auto buf = new ubyte[7]; 986 reader.readAll(buf); 987 enforce(buf == [1, 2, 3, 4, 5, 6, 7]); 988 } 989 990 /* 991 * Check that chunking works as expected. 992 */ 993 { 994 scope (exit) tryRemove(fileName); 995 996 static assert(EventSize.sizeof == 4); 997 enum CHUNK_SIZE = 10; 998 999 // Write some contents to the file. 1000 { 1001 auto writer = new TFileWriterTransport(fileName); 1002 writer.chunkSize = CHUNK_SIZE; 1003 writer.open(); 1004 scope (exit) writer.close(); 1005 1006 writer.write([0xde]); 1007 writer.write([0xad]); 1008 // Chunk boundary here. 1009 writer.write([0xbe]); 1010 // The next write doesn't fit in the five bytes remaining, so we expect 1011 // padding zero bytes to be written. 1012 writer.write([0xef, 0x12]); 1013 1014 try { 1015 writer.write(new ubyte[CHUNK_SIZE]); 1016 enforce(false, "Could write event not fitting in a single chunk."); 1017 } catch (TTransportException e) {} 1018 1019 writer.flush(); 1020 } 1021 1022 // Check the raw contents of the file to see if chunk padding was written 1023 // as expected. 1024 auto file = File(fileName, "r"); 1025 enforce(file.size == 26); 1026 auto written = new ubyte[26]; 1027 file.rawRead(written); 1028 enforce(written == [ 1029 1, 0, 0, 0, 0xde, 1030 1, 0, 0, 0, 0xad, 1031 1, 0, 0, 0, 0xbe, 1032 0, 0, 0, 0, 0, 1033 2, 0, 0, 0, 0xef, 0x12 1034 ]); 1035 1036 // Read the data back in, getting all the events at once. 1037 { 1038 auto reader = new TFileReaderTransport(fileName); 1039 reader.chunkSize = CHUNK_SIZE; 1040 reader.open(); 1041 scope (exit) reader.close(); 1042 1043 auto buf = new ubyte[5]; 1044 reader.readAll(buf); 1045 enforce(buf == [0xde, 0xad, 0xbe, 0xef, 0x12]); 1046 } 1047 } 1048 1049 /* 1050 * Make sure that close() exits "quickly", i.e. that there is no problem 1051 * with the worker thread waking up. 1052 */ 1053 { 1054 import std.conv : text; 1055 enum NUM_ITERATIONS = 1000; 1056 1057 uint numOver = 0; 1058 foreach (n; 0 .. NUM_ITERATIONS) { 1059 scope (exit) tryRemove(fileName); 1060 1061 auto transport = new TFileWriterTransport(fileName); 1062 transport.open(); 1063 1064 // Write something so that the writer thread gets started. 1065 transport.write(cast(ubyte[])"foo"); 1066 1067 // Every other iteration, also call flush(), just in case that potentially 1068 // has any effect on how the writer thread wakes up. 1069 if (n & 0x1) { 1070 transport.flush(); 1071 } 1072 1073 // Time the call to close(). 1074 auto sw = StopWatch(AutoStart.yes); 1075 transport.close(); 1076 sw.stop(); 1077 1078 // If any attempt takes more than 500ms, treat that as a fatal failure to 1079 // avoid looping over a potentially very slow operation. 1080 enforce(sw.peek().total!"msecs" < 1500, 1081 text("close() took ", sw.peek().total!"msecs", "ms.")); 1082 1083 // Normally, it takes less than 5ms on my dev box. 1084 // However, if the box is heavily loaded, some of the test runs can take 1085 // longer. Additionally, on a Windows Server 2008 instance running in 1086 // a VirtualBox VM, it has been observed that about a quarter of the runs 1087 // takes (217 ± 1) ms, for reasons not yet known. 1088 if (sw.peek().total!"msecs" > 50) { 1089 ++numOver; 1090 } 1091 1092 // Force garbage collection runs every now and then to make sure we 1093 // don't run out of OS thread handles. 1094 if (!(n % 100)) GC.collect(); 1095 } 1096 1097 // Make sure fewer than a third of the runs took longer than 5ms. 1098 enforce(numOver < NUM_ITERATIONS / 3, 1099 text(numOver, " iterations took more than 10 ms.")); 1100 } 1101 }