1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 module thrift.transport.framed;
21 
22 import core.bitop : bswap;
23 import std.algorithm : min;
24 import std.array : empty;
25 import std.exception : enforce;
26 import thrift.transport.base;
27 
28 /**
29  * Framed transport.
30  *
31  * All writes go into an in-memory buffer until flush is called, at which point
32  * the transport writes the length of the entire binary chunk followed by the
33  * data payload. The receiver on the other end then performs a single
34  * »fixed-length« read to get the whole message off the wire.
35  */
36 final class TFramedTransport : TBaseTransport {
37   /**
38    * Constructs a new framed transport.
39    *
40    * Params:
41    *   transport = The underlying transport to wrap.
42    */
43   this(TTransport transport) {
44     transport_ = transport;
45   }
46 
47   /**
48    * Returns the wrapped transport.
49    */
50   TTransport underlyingTransport() @property {
51     return transport_;
52   }
53 
54   override bool isOpen() @property {
55     return transport_.isOpen;
56   }
57 
58   override bool peek() {
59     return rBuf_.length > 0 || transport_.peek();
60   }
61 
62   override void open() {
63     transport_.open();
64   }
65 
66   override void close() {
67     flush();
68     transport_.close();
69   }
70 
71   /**
72    * Attempts to read data into the given buffer, stopping when the buffer is
73    * exhausted or the frame end is reached.
74    *
75    * TODO: Contrary to the C++ implementation, this never does cross-frame
76    * reads – is there actually a valid use case for that?
77    *
78    * Params:
79    *   buf = Slice to use as buffer.
80    *
81    * Returns: How many bytes were actually read.
82    *
83    * Throws: TTransportException if an error occurs.
84    */
85   override size_t read(ubyte[] buf) {
86     // If the buffer is empty, read a new frame off the wire.
87     if (rBuf_.empty) {
88       bool gotFrame = readFrame();
89       if (!gotFrame) return 0;
90     }
91 
92     auto size = min(rBuf_.length, buf.length);
93     buf[0..size] = rBuf_[0..size];
94     rBuf_ = rBuf_[size..$];
95     return size;
96   }
97 
98   override void write(in ubyte[] buf) {
99     wBuf_ ~= buf;
100   }
101 
102   override void flush() {
103     if (wBuf_.empty) return;
104 
105     // Properly reset the write buffer even some of the protocol operations go
106     // wrong.
107     scope (exit) {
108       wBuf_.length = 0;
109       wBuf_.assumeSafeAppend();
110     }
111 
112     int len = bswap(cast(int)wBuf_.length);
113     transport_.write(cast(ubyte[])(&len)[0..1]);
114     transport_.write(wBuf_);
115     transport_.flush();
116   }
117 
118   override const(ubyte)[] borrow(ubyte* buf, size_t len) {
119     if (len <= rBuf_.length) {
120       return rBuf_;
121     } else {
122       // Don't try attempting cross-frame borrows, trying that does not make
123       // much sense anyway.
124       return null;
125     }
126   }
127 
128   override void consume(size_t len) {
129     enforce(len <= rBuf_.length, new TTransportException(
130       "Invalid consume length", TTransportException.Type.BAD_ARGS));
131     rBuf_ = rBuf_[len .. $];
132   }
133 
134 private:
135   bool readFrame() {
136     // Read the size of the next frame. We can't use readAll() since that
137     // always throws an exception on EOF, but want to throw an exception only
138     // if EOF occurs after partial size data.
139     int size;
140     size_t size_read;
141     while (size_read < size.sizeof) {
142       auto data = (cast(ubyte*)&size)[size_read..size.sizeof];
143       auto read = transport_.read(data);
144       if (read == 0) {
145         if (size_read == 0) {
146           // EOF before any data was read.
147           return false;
148         } else {
149           // EOF after a partial frame header – illegal.
150           throw new TTransportException(
151             "No more data to read after partial frame header",
152             TTransportException.Type.END_OF_FILE
153           );
154         }
155       }
156       size_read += read;
157     }
158 
159     size = bswap(size);
160     enforce(size >= 0, new TTransportException("Frame size has negative value",
161       TTransportException.Type.CORRUPTED_DATA));
162 
163     // TODO: Benchmark this.
164     rBuf_.length = size;
165     rBuf_.assumeSafeAppend();
166 
167     transport_.readAll(rBuf_);
168     return true;
169   }
170 
171   TTransport transport_;
172   ubyte[] rBuf_;
173   ubyte[] wBuf_;
174 }
175 
176 /**
177  * Wraps given transports into TFramedTransports.
178  */
179 alias TWrapperTransportFactory!TFramedTransport TFramedTransportFactory;
180 
181 version (unittest) {
182   import std.random : Mt19937, uniform;
183   import thrift.transport.memory;
184 }
185 
186 // Some basic random testing, always starting with the same seed for
187 // deterministic unit test results – more tests in transport_test.
188 unittest {
189   auto randGen = Mt19937(42);
190 
191   // 32 kiB of data to work with.
192   auto data = new ubyte[1 << 15];
193   foreach (ref b; data) {
194     b = uniform!"[]"(cast(ubyte)0, cast(ubyte)255, randGen);
195   }
196 
197   // Generate a list of chunk sizes to split the data into. A uniform
198   // distribution is not quite realistic, but std.random doesn't have anything
199   // else yet.
200   enum MAX_FRAME_LENGTH = 512;
201   auto chunkSizesList = new size_t[][2];
202   foreach (ref chunkSizes; chunkSizesList) {
203     size_t sum;
204     while (true) {
205       auto curLen = uniform(0, MAX_FRAME_LENGTH, randGen);
206       sum += curLen;
207       if (sum > data.length) break;
208       chunkSizes ~= curLen;
209     }
210   }
211   chunkSizesList ~= [data.length]; // Also test whole chunk at once.
212 
213   // Test writing data.
214   {
215     foreach (chunkSizes; chunkSizesList) {
216       auto buf = new TMemoryBuffer;
217       auto framed = new TFramedTransport(buf);
218 
219       auto remainingData = data;
220       foreach (chunkSize; chunkSizes) {
221         framed.write(remainingData[0..chunkSize]);
222         remainingData = remainingData[chunkSize..$];
223       }
224       framed.flush();
225 
226       auto writtenData = data[0..($ - remainingData.length)];
227       auto actualData = buf.getContents();
228 
229       // Check frame size.
230       int frameSize = bswap((cast(int[])(actualData[0..int.sizeof]))[0]);
231       enforce(frameSize == writtenData.length);
232 
233       // Check actual data.
234       enforce(actualData[int.sizeof..$] == writtenData);
235     }
236   }
237 
238   // Test reading data.
239   {
240     foreach (chunkSizes; chunkSizesList) {
241       auto buf = new TMemoryBuffer;
242 
243       auto size = bswap(cast(int)data.length);
244       buf.write(cast(ubyte[])(&size)[0..1]);
245       buf.write(data);
246 
247       auto framed = new TFramedTransport(buf);
248       ubyte[] readData;
249       readData.reserve(data.length);
250       foreach (chunkSize; chunkSizes) {
251         // This should work with read because we have one huge frame.
252         auto oldReadLen = readData.length;
253         readData.length += chunkSize;
254         framed.read(readData[oldReadLen..$]);
255       }
256 
257       enforce(readData == data[0..readData.length]);
258     }
259   }
260 
261   // Test combined reading/writing of multiple frames.
262   foreach (flushProbability; [1, 2, 4, 8, 16, 32]) {
263     foreach (chunkSizes; chunkSizesList) {
264       auto buf = new TMemoryBuffer;
265       auto framed = new TFramedTransport(buf);
266 
267       size_t[] frameSizes;
268 
269       // Write the data.
270       size_t frameSize;
271       auto remainingData = data;
272       foreach (chunkSize; chunkSizes) {
273         framed.write(remainingData[0..chunkSize]);
274         remainingData = remainingData[chunkSize..$];
275 
276         frameSize += chunkSize;
277         if (frameSize > 0 && uniform(0, flushProbability, randGen) == 0) {
278           frameSizes ~= frameSize;
279           frameSize = 0;
280           framed.flush();
281         }
282       }
283       if (frameSize > 0) {
284         frameSizes ~= frameSize;
285         frameSize = 0;
286         framed.flush();
287       }
288 
289       // Read it back.
290       auto readData = new ubyte[data.length - remainingData.length];
291       auto remainToRead = readData;
292       foreach (fSize; frameSizes) {
293         // We are exploiting an implementation detail of TFramedTransport:
294         // The read buffer starts empty and it will never return more than one
295         // frame per read, so by just requesting all of the data, we should
296         // always get exactly one frame.
297         auto got = framed.read(remainToRead);
298         enforce(got == fSize);
299         remainToRead = remainToRead[fSize..$];
300       }
301 
302       enforce(remainToRead.empty);
303       enforce(readData == data[0..readData.length]);
304     }
305   }
306 }
307 
308 // Test flush()ing an empty buffer.
309 unittest {
310   auto buf = new TMemoryBuffer();
311   auto framed = new TFramedTransport(buf);
312   immutable out1 = [0, 0, 0, 1, 'a'];
313   immutable out2 = [0, 0, 0, 1, 'a', 0, 0, 0, 2, 'b', 'c'];
314 
315   framed.flush();
316   enforce(buf.getContents() == []);
317   framed.flush();
318   framed.flush();
319   enforce(buf.getContents() == []);
320   framed.write(cast(ubyte[])"a");
321   enforce(buf.getContents() == []);
322   framed.flush();
323   enforce(buf.getContents() == out1);
324   framed.flush();
325   framed.flush();
326   enforce(buf.getContents() == out1);
327   framed.write(cast(ubyte[])"bc");
328   enforce(buf.getContents() == out1);
329   framed.flush();
330   enforce(buf.getContents() == out2);
331   framed.flush();
332   framed.flush();
333   enforce(buf.getContents() == out2);
334 }