1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 module thrift.transport.framed; 21 22 import core.bitop : bswap; 23 import std.algorithm : min; 24 import std.array : empty; 25 import std.exception : enforce; 26 import thrift.transport.base; 27 28 /** 29 * Framed transport. 30 * 31 * All writes go into an in-memory buffer until flush is called, at which point 32 * the transport writes the length of the entire binary chunk followed by the 33 * data payload. The receiver on the other end then performs a single 34 * »fixed-length« read to get the whole message off the wire. 35 */ 36 final class TFramedTransport : TBaseTransport { 37 /** 38 * Constructs a new framed transport. 39 * 40 * Params: 41 * transport = The underlying transport to wrap. 42 */ 43 this(TTransport transport) { 44 transport_ = transport; 45 } 46 47 /** 48 * Returns the wrapped transport. 49 */ 50 TTransport underlyingTransport() @property { 51 return transport_; 52 } 53 54 override bool isOpen() @property { 55 return transport_.isOpen; 56 } 57 58 override bool peek() { 59 return rBuf_.length > 0 || transport_.peek(); 60 } 61 62 override void open() { 63 transport_.open(); 64 } 65 66 override void close() { 67 flush(); 68 transport_.close(); 69 } 70 71 /** 72 * Attempts to read data into the given buffer, stopping when the buffer is 73 * exhausted or the frame end is reached. 74 * 75 * TODO: Contrary to the C++ implementation, this never does cross-frame 76 * reads – is there actually a valid use case for that? 77 * 78 * Params: 79 * buf = Slice to use as buffer. 80 * 81 * Returns: How many bytes were actually read. 82 * 83 * Throws: TTransportException if an error occurs. 84 */ 85 override size_t read(ubyte[] buf) { 86 // If the buffer is empty, read a new frame off the wire. 87 if (rBuf_.empty) { 88 bool gotFrame = readFrame(); 89 if (!gotFrame) return 0; 90 } 91 92 auto size = min(rBuf_.length, buf.length); 93 buf[0..size] = rBuf_[0..size]; 94 rBuf_ = rBuf_[size..$]; 95 return size; 96 } 97 98 override void write(in ubyte[] buf) { 99 wBuf_ ~= buf; 100 } 101 102 override void flush() { 103 if (wBuf_.empty) return; 104 105 // Properly reset the write buffer even some of the protocol operations go 106 // wrong. 107 scope (exit) { 108 wBuf_.length = 0; 109 wBuf_.assumeSafeAppend(); 110 } 111 112 int len = bswap(cast(int)wBuf_.length); 113 transport_.write(cast(ubyte[])(&len)[0..1]); 114 transport_.write(wBuf_); 115 transport_.flush(); 116 } 117 118 override const(ubyte)[] borrow(ubyte* buf, size_t len) { 119 if (len <= rBuf_.length) { 120 return rBuf_; 121 } else { 122 // Don't try attempting cross-frame borrows, trying that does not make 123 // much sense anyway. 124 return null; 125 } 126 } 127 128 override void consume(size_t len) { 129 enforce(len <= rBuf_.length, new TTransportException( 130 "Invalid consume length", TTransportException.Type.BAD_ARGS)); 131 rBuf_ = rBuf_[len .. $]; 132 } 133 134 private: 135 bool readFrame() { 136 // Read the size of the next frame. We can't use readAll() since that 137 // always throws an exception on EOF, but want to throw an exception only 138 // if EOF occurs after partial size data. 139 int size; 140 size_t size_read; 141 while (size_read < size.sizeof) { 142 auto data = (cast(ubyte*)&size)[size_read..size.sizeof]; 143 auto read = transport_.read(data); 144 if (read == 0) { 145 if (size_read == 0) { 146 // EOF before any data was read. 147 return false; 148 } else { 149 // EOF after a partial frame header – illegal. 150 throw new TTransportException( 151 "No more data to read after partial frame header", 152 TTransportException.Type.END_OF_FILE 153 ); 154 } 155 } 156 size_read += read; 157 } 158 159 size = bswap(size); 160 enforce(size >= 0, new TTransportException("Frame size has negative value", 161 TTransportException.Type.CORRUPTED_DATA)); 162 163 // TODO: Benchmark this. 164 rBuf_.length = size; 165 rBuf_.assumeSafeAppend(); 166 167 transport_.readAll(rBuf_); 168 return true; 169 } 170 171 TTransport transport_; 172 ubyte[] rBuf_; 173 ubyte[] wBuf_; 174 } 175 176 /** 177 * Wraps given transports into TFramedTransports. 178 */ 179 alias TWrapperTransportFactory!TFramedTransport TFramedTransportFactory; 180 181 version (unittest) { 182 import std.random : Mt19937, uniform; 183 import thrift.transport.memory; 184 } 185 186 // Some basic random testing, always starting with the same seed for 187 // deterministic unit test results – more tests in transport_test. 188 unittest { 189 auto randGen = Mt19937(42); 190 191 // 32 kiB of data to work with. 192 auto data = new ubyte[1 << 15]; 193 foreach (ref b; data) { 194 b = uniform!"[]"(cast(ubyte)0, cast(ubyte)255, randGen); 195 } 196 197 // Generate a list of chunk sizes to split the data into. A uniform 198 // distribution is not quite realistic, but std.random doesn't have anything 199 // else yet. 200 enum MAX_FRAME_LENGTH = 512; 201 auto chunkSizesList = new size_t[][2]; 202 foreach (ref chunkSizes; chunkSizesList) { 203 size_t sum; 204 while (true) { 205 auto curLen = uniform(0, MAX_FRAME_LENGTH, randGen); 206 sum += curLen; 207 if (sum > data.length) break; 208 chunkSizes ~= curLen; 209 } 210 } 211 chunkSizesList ~= [data.length]; // Also test whole chunk at once. 212 213 // Test writing data. 214 { 215 foreach (chunkSizes; chunkSizesList) { 216 auto buf = new TMemoryBuffer; 217 auto framed = new TFramedTransport(buf); 218 219 auto remainingData = data; 220 foreach (chunkSize; chunkSizes) { 221 framed.write(remainingData[0..chunkSize]); 222 remainingData = remainingData[chunkSize..$]; 223 } 224 framed.flush(); 225 226 auto writtenData = data[0..($ - remainingData.length)]; 227 auto actualData = buf.getContents(); 228 229 // Check frame size. 230 int frameSize = bswap((cast(int[])(actualData[0..int.sizeof]))[0]); 231 enforce(frameSize == writtenData.length); 232 233 // Check actual data. 234 enforce(actualData[int.sizeof..$] == writtenData); 235 } 236 } 237 238 // Test reading data. 239 { 240 foreach (chunkSizes; chunkSizesList) { 241 auto buf = new TMemoryBuffer; 242 243 auto size = bswap(cast(int)data.length); 244 buf.write(cast(ubyte[])(&size)[0..1]); 245 buf.write(data); 246 247 auto framed = new TFramedTransport(buf); 248 ubyte[] readData; 249 readData.reserve(data.length); 250 foreach (chunkSize; chunkSizes) { 251 // This should work with read because we have one huge frame. 252 auto oldReadLen = readData.length; 253 readData.length += chunkSize; 254 framed.read(readData[oldReadLen..$]); 255 } 256 257 enforce(readData == data[0..readData.length]); 258 } 259 } 260 261 // Test combined reading/writing of multiple frames. 262 foreach (flushProbability; [1, 2, 4, 8, 16, 32]) { 263 foreach (chunkSizes; chunkSizesList) { 264 auto buf = new TMemoryBuffer; 265 auto framed = new TFramedTransport(buf); 266 267 size_t[] frameSizes; 268 269 // Write the data. 270 size_t frameSize; 271 auto remainingData = data; 272 foreach (chunkSize; chunkSizes) { 273 framed.write(remainingData[0..chunkSize]); 274 remainingData = remainingData[chunkSize..$]; 275 276 frameSize += chunkSize; 277 if (frameSize > 0 && uniform(0, flushProbability, randGen) == 0) { 278 frameSizes ~= frameSize; 279 frameSize = 0; 280 framed.flush(); 281 } 282 } 283 if (frameSize > 0) { 284 frameSizes ~= frameSize; 285 frameSize = 0; 286 framed.flush(); 287 } 288 289 // Read it back. 290 auto readData = new ubyte[data.length - remainingData.length]; 291 auto remainToRead = readData; 292 foreach (fSize; frameSizes) { 293 // We are exploiting an implementation detail of TFramedTransport: 294 // The read buffer starts empty and it will never return more than one 295 // frame per read, so by just requesting all of the data, we should 296 // always get exactly one frame. 297 auto got = framed.read(remainToRead); 298 enforce(got == fSize); 299 remainToRead = remainToRead[fSize..$]; 300 } 301 302 enforce(remainToRead.empty); 303 enforce(readData == data[0..readData.length]); 304 } 305 } 306 } 307 308 // Test flush()ing an empty buffer. 309 unittest { 310 auto buf = new TMemoryBuffer(); 311 auto framed = new TFramedTransport(buf); 312 immutable out1 = [0, 0, 0, 1, 'a']; 313 immutable out2 = [0, 0, 0, 1, 'a', 0, 0, 0, 2, 'b', 'c']; 314 315 framed.flush(); 316 enforce(buf.getContents() == []); 317 framed.flush(); 318 framed.flush(); 319 enforce(buf.getContents() == []); 320 framed.write(cast(ubyte[])"a"); 321 enforce(buf.getContents() == []); 322 framed.flush(); 323 enforce(buf.getContents() == out1); 324 framed.flush(); 325 framed.flush(); 326 enforce(buf.getContents() == out1); 327 framed.write(cast(ubyte[])"bc"); 328 enforce(buf.getContents() == out1); 329 framed.flush(); 330 enforce(buf.getContents() == out2); 331 framed.flush(); 332 framed.flush(); 333 enforce(buf.getContents() == out2); 334 }