1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.transport.piped; 20 21 import thrift.transport.base; 22 import thrift.transport.memory; 23 24 /** 25 * Pipes data request from one transport to another when readEnd() 26 * or writeEnd() is called. 27 * 28 * A typical use case would be to log requests on e.g. a socket to 29 * disk (i. e. pipe them to a TFileWriterTransport). 30 * 31 * The implementation keeps an internal buffer which expands to 32 * hold the whole amount of data read/written until the corresponding *End() 33 * method is called. 34 * 35 * Contrary to the C++ implementation, this doesn't introduce yet another layer 36 * of input/output buffering, all calls are passed to the underlying source 37 * transport verbatim. 38 */ 39 final class TPipedTransport(Source = TTransport) if ( 40 isTTransport!Source 41 ) : TBaseTransport { 42 /// The default initial buffer size if not explicitly specified, in bytes. 43 enum DEFAULT_INITIAL_BUFFER_SIZE = 512; 44 45 /** 46 * Constructs a new instance. 47 * 48 * By default, only reads are piped (pipeReads = true, pipeWrites = false). 49 * 50 * Params: 51 * srcTrans = The transport to which all requests are forwarded. 52 * dstTrans = The transport the read/written data is copied to. 53 * initialBufferSize = The default size of the read/write buffers, for 54 * performance tuning. 55 */ 56 this(Source srcTrans, TTransport dstTrans, 57 size_t initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE 58 ) { 59 srcTrans_ = srcTrans; 60 dstTrans_ = dstTrans; 61 62 readBuffer_ = new TMemoryBuffer(initialBufferSize); 63 writeBuffer_ = new TMemoryBuffer(initialBufferSize); 64 65 pipeReads_ = true; 66 pipeWrites_ = false; 67 } 68 69 bool pipeReads() @property const { 70 return pipeReads_; 71 } 72 73 void pipeReads(bool value) @property { 74 if (!value) { 75 readBuffer_.reset(); 76 } 77 pipeReads_ = value; 78 } 79 80 bool pipeWrites() @property const { 81 return pipeWrites_; 82 } 83 84 void pipeWrites(bool value) @property { 85 if (!value) { 86 writeBuffer_.reset(); 87 } 88 pipeWrites_ = value; 89 } 90 91 override bool isOpen() { 92 return srcTrans_.isOpen(); 93 } 94 95 override bool peek() { 96 return srcTrans_.peek(); 97 } 98 99 override void open() { 100 srcTrans_.open(); 101 } 102 103 override void close() { 104 srcTrans_.close(); 105 } 106 107 override size_t read(ubyte[] buf) { 108 auto bytesRead = srcTrans_.read(buf); 109 110 if (pipeReads_) { 111 readBuffer_.write(buf[0 .. bytesRead]); 112 } 113 114 return bytesRead; 115 } 116 117 override size_t readEnd() { 118 if (pipeReads_) { 119 auto data = readBuffer_.getContents(); 120 dstTrans_.write(data); 121 dstTrans_.flush(); 122 readBuffer_.reset(); 123 124 srcTrans_.readEnd(); 125 126 // Return data.length instead of the readEnd() result of the source 127 // transports because it might not be available from it. 128 return data.length; 129 } 130 131 return srcTrans_.readEnd(); 132 } 133 134 override void write(in ubyte[] buf) { 135 if (pipeWrites_) { 136 writeBuffer_.write(buf); 137 } 138 139 srcTrans_.write(buf); 140 } 141 142 override size_t writeEnd() { 143 if (pipeWrites_) { 144 auto data = writeBuffer_.getContents(); 145 dstTrans_.write(data); 146 dstTrans_.flush(); 147 writeBuffer_.reset(); 148 149 srcTrans_.writeEnd(); 150 151 // Return data.length instead of the readEnd() result of the source 152 // transports because it might not be available from it. 153 return data.length; 154 } 155 156 return srcTrans_.writeEnd(); 157 } 158 159 override void flush() { 160 srcTrans_.flush(); 161 } 162 163 private: 164 Source srcTrans_; 165 TTransport dstTrans_; 166 167 TMemoryBuffer readBuffer_; 168 TMemoryBuffer writeBuffer_; 169 170 bool pipeReads_; 171 bool pipeWrites_; 172 } 173 174 /** 175 * TPipedTransport construction helper to avoid having to explicitly 176 * specify the transport types, i.e. to allow the constructor being called 177 * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). 178 */ 179 TPipedTransport!Source tPipedTransport(Source)( 180 Source srcTrans, TTransport dstTrans 181 ) if (isTTransport!Source) { 182 return new typeof(return)(srcTrans, dstTrans); 183 } 184 185 version (unittest) { 186 // DMD @@BUG@@: UFCS for std.array.empty doesn't work when import is moved 187 // into unittest block. 188 import std.array; 189 import std.exception : enforce; 190 } 191 192 unittest { 193 auto underlying = new TMemoryBuffer; 194 auto pipeTarget = new TMemoryBuffer; 195 auto trans = tPipedTransport(underlying, pipeTarget); 196 197 underlying.write(cast(ubyte[])"abcd"); 198 199 ubyte[4] buffer; 200 trans.readAll(buffer[0 .. 2]); 201 enforce(buffer[0 .. 2] == "ab"); 202 enforce(pipeTarget.getContents().empty); 203 204 trans.readEnd(); 205 enforce(pipeTarget.getContents() == "ab"); 206 pipeTarget.reset(); 207 208 underlying.write(cast(ubyte[])"ef"); 209 trans.readAll(buffer[0 .. 2]); 210 enforce(buffer[0 .. 2] == "cd"); 211 enforce(pipeTarget.getContents().empty); 212 213 trans.readAll(buffer[0 .. 2]); 214 enforce(buffer[0 .. 2] == "ef"); 215 enforce(pipeTarget.getContents().empty); 216 217 trans.readEnd(); 218 enforce(pipeTarget.getContents() == "cdef"); 219 }