1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.transport.piped;
20 
21 import thrift.transport.base;
22 import thrift.transport.memory;
23 
24 /**
25  * Pipes data request from one transport to another when readEnd()
26  * or writeEnd() is called.
27  *
28  * A typical use case would be to log requests on e.g. a socket to
29  * disk (i. e. pipe them to a TFileWriterTransport).
30  *
31  * The implementation keeps an internal buffer which expands to
32  * hold the whole amount of data read/written until the corresponding *End()
33  * method is called.
34  *
35  * Contrary to the C++ implementation, this doesn't introduce yet another layer
36  * of input/output buffering, all calls are passed to the underlying source
37  * transport verbatim.
38  */
39 final class TPipedTransport(Source = TTransport) if (
40   isTTransport!Source
41 ) : TBaseTransport {
42   /// The default initial buffer size if not explicitly specified, in bytes.
43   enum DEFAULT_INITIAL_BUFFER_SIZE = 512;
44 
45   /**
46    * Constructs a new instance.
47    *
48    * By default, only reads are piped (pipeReads = true, pipeWrites = false).
49    *
50    * Params:
51    *   srcTrans = The transport to which all requests are forwarded.
52    *   dstTrans = The transport the read/written data is copied to.
53    *   initialBufferSize = The default size of the read/write buffers, for
54    *     performance tuning.
55    */
56   this(Source srcTrans, TTransport dstTrans,
57     size_t initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE
58   ) {
59     srcTrans_ = srcTrans;
60     dstTrans_ = dstTrans;
61 
62     readBuffer_ = new TMemoryBuffer(initialBufferSize);
63     writeBuffer_ = new TMemoryBuffer(initialBufferSize);
64 
65     pipeReads_ = true;
66     pipeWrites_ = false;
67   }
68 
69   bool pipeReads() @property const {
70     return pipeReads_;
71   }
72 
73   void pipeReads(bool value) @property {
74     if (!value) {
75       readBuffer_.reset();
76     }
77     pipeReads_ = value;
78   }
79 
80   bool pipeWrites() @property const {
81     return pipeWrites_;
82   }
83 
84   void pipeWrites(bool value) @property {
85     if (!value) {
86       writeBuffer_.reset();
87     }
88     pipeWrites_ = value;
89   }
90 
91   override bool isOpen() {
92     return srcTrans_.isOpen();
93   }
94 
95   override bool peek() {
96     return srcTrans_.peek();
97   }
98 
99   override void open() {
100     srcTrans_.open();
101   }
102 
103   override void close() {
104     srcTrans_.close();
105   }
106 
107   override size_t read(ubyte[] buf) {
108     auto bytesRead = srcTrans_.read(buf);
109 
110     if (pipeReads_) {
111       readBuffer_.write(buf[0 .. bytesRead]);
112     }
113 
114     return bytesRead;
115   }
116 
117   override size_t readEnd() {
118     if (pipeReads_) {
119       auto data = readBuffer_.getContents();
120       dstTrans_.write(data);
121       dstTrans_.flush();
122       readBuffer_.reset();
123 
124       srcTrans_.readEnd();
125 
126       // Return data.length instead of the readEnd() result of the source
127       // transports because it might not be available from it.
128       return data.length;
129     }
130 
131     return srcTrans_.readEnd();
132   }
133 
134   override void write(in ubyte[] buf) {
135     if (pipeWrites_) {
136       writeBuffer_.write(buf);
137     }
138 
139     srcTrans_.write(buf);
140   }
141 
142   override size_t writeEnd() {
143     if (pipeWrites_) {
144       auto data = writeBuffer_.getContents();
145       dstTrans_.write(data);
146       dstTrans_.flush();
147       writeBuffer_.reset();
148 
149       srcTrans_.writeEnd();
150 
151       // Return data.length instead of the readEnd() result of the source
152       // transports because it might not be available from it.
153       return data.length;
154     }
155 
156     return srcTrans_.writeEnd();
157   }
158 
159   override void flush() {
160     srcTrans_.flush();
161   }
162 
163 private:
164   Source srcTrans_;
165   TTransport dstTrans_;
166 
167   TMemoryBuffer readBuffer_;
168   TMemoryBuffer writeBuffer_;
169 
170   bool pipeReads_;
171   bool pipeWrites_;
172 }
173 
174 /**
175  * TPipedTransport construction helper to avoid having to explicitly
176  * specify the transport types, i.e. to allow the constructor being called
177  * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
178  */
179 TPipedTransport!Source tPipedTransport(Source)(
180   Source srcTrans, TTransport dstTrans
181 ) if (isTTransport!Source) {
182   return new typeof(return)(srcTrans, dstTrans);
183 }
184 
185 version (unittest) {
186   // DMD @@BUG@@: UFCS for std.array.empty doesn't work when import is moved
187   // into unittest block.
188   import std.array;
189   import std.exception : enforce;
190 }
191 
192 unittest {
193   auto underlying = new TMemoryBuffer;
194   auto pipeTarget = new TMemoryBuffer;
195   auto trans = tPipedTransport(underlying, pipeTarget);
196 
197   underlying.write(cast(ubyte[])"abcd");
198 
199   ubyte[4] buffer;
200   trans.readAll(buffer[0 .. 2]);
201   enforce(buffer[0 .. 2] == "ab");
202   enforce(pipeTarget.getContents().empty);
203 
204   trans.readEnd();
205   enforce(pipeTarget.getContents() == "ab");
206   pipeTarget.reset();
207 
208   underlying.write(cast(ubyte[])"ef");
209   trans.readAll(buffer[0 .. 2]);
210   enforce(buffer[0 .. 2] == "cd");
211   enforce(pipeTarget.getContents().empty);
212 
213   trans.readAll(buffer[0 .. 2]);
214   enforce(buffer[0 .. 2] == "ef");
215   enforce(pipeTarget.getContents().empty);
216 
217   trans.readEnd();
218   enforce(pipeTarget.getContents() == "cdef");
219 }