1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 module thrift.transport.zlib; 21 22 import core.bitop : bswap; 23 import etc.c.zlib; 24 import std.algorithm : min; 25 import std.array : empty; 26 import std.conv : to; 27 import std.exception : enforce; 28 import thrift.base; 29 import thrift.transport.base; 30 31 /** 32 * zlib transport. Compresses (deflates) data before writing it to the 33 * underlying transport, and decompresses (inflates) it after reading. 34 */ 35 final class TZlibTransport : TBaseTransport { 36 // These defaults have yet to be optimized. 37 enum DEFAULT_URBUF_SIZE = 128; 38 enum DEFAULT_CRBUF_SIZE = 1024; 39 enum DEFAULT_UWBUF_SIZE = 128; 40 enum DEFAULT_CWBUF_SIZE = 1024; 41 42 /** 43 * Constructs a new zlib transport. 44 * 45 * Params: 46 * transport = The underlying transport to wrap. 47 * urbufSize = The size of the uncompressed reading buffer, in bytes. 48 * crbufSize = The size of the compressed reading buffer, in bytes. 49 * uwbufSize = The size of the uncompressed writing buffer, in bytes. 50 * cwbufSize = The size of the compressed writing buffer, in bytes. 51 */ 52 this( 53 TTransport transport, 54 size_t urbufSize = DEFAULT_URBUF_SIZE, 55 size_t crbufSize = DEFAULT_CRBUF_SIZE, 56 size_t uwbufSize = DEFAULT_UWBUF_SIZE, 57 size_t cwbufSize = DEFAULT_CWBUF_SIZE 58 ) { 59 transport_ = transport; 60 61 enforce(uwbufSize >= MIN_DIRECT_DEFLATE_SIZE, new TTransportException( 62 "TZLibTransport: uncompressed write buffer must be at least " ~ 63 to!string(MIN_DIRECT_DEFLATE_SIZE) ~ "bytes in size.", 64 TTransportException.Type.BAD_ARGS)); 65 66 urbuf_ = new ubyte[urbufSize]; 67 crbuf_ = new ubyte[crbufSize]; 68 uwbuf_ = new ubyte[uwbufSize]; 69 cwbuf_ = new ubyte[cwbufSize]; 70 71 rstream_ = new z_stream; 72 rstream_.next_in = crbuf_.ptr; 73 rstream_.avail_in = 0; 74 rstream_.next_out = urbuf_.ptr; 75 rstream_.avail_out = to!uint(urbuf_.length); 76 77 wstream_ = new z_stream; 78 wstream_.next_in = uwbuf_.ptr; 79 wstream_.avail_in = 0; 80 wstream_.next_out = cwbuf_.ptr; 81 wstream_.avail_out = to!uint(crbuf_.length); 82 83 zlibEnforce(inflateInit(rstream_), rstream_); 84 scope (failure) { 85 zlibLogError(inflateEnd(rstream_), rstream_); 86 } 87 88 zlibEnforce(deflateInit(wstream_, Z_DEFAULT_COMPRESSION), wstream_); 89 } 90 91 ~this() { 92 zlibLogError(inflateEnd(rstream_), rstream_); 93 94 auto result = deflateEnd(wstream_); 95 // Z_DATA_ERROR may indicate unflushed data, so just ignore it. 96 if (result != Z_DATA_ERROR) { 97 zlibLogError(result, wstream_); 98 } 99 } 100 101 /** 102 * Returns the wrapped transport. 103 */ 104 TTransport underlyingTransport() @property { 105 return transport_; 106 } 107 108 override bool isOpen() @property { 109 return readAvail > 0 || transport_.isOpen; 110 } 111 112 override bool peek() { 113 return readAvail > 0 || transport_.peek(); 114 } 115 116 override void open() { 117 transport_.open(); 118 } 119 120 override void close() { 121 transport_.close(); 122 } 123 124 override size_t read(ubyte[] buf) { 125 // The C++ implementation suggests to skip urbuf on big reads in future 126 // versions, we would benefit from it as well. 127 auto origLen = buf.length; 128 while (true) { 129 auto give = min(readAvail, buf.length); 130 131 // If std.range.put was optimized for slicable ranges, it could be used 132 // here as well. 133 buf[0 .. give] = urbuf_[urpos_ .. urpos_ + give]; 134 buf = buf[give .. $]; 135 urpos_ += give; 136 137 auto need = buf.length; 138 if (need == 0) { 139 // We could manage to get the all the data requested. 140 return origLen; 141 } 142 143 if (inputEnded_ || (need < origLen && rstream_.avail_in == 0)) { 144 // We didn't fill buf completely, but there is no more data available. 145 return origLen - need; 146 } 147 148 // Refill our buffer by reading more data through zlib. 149 rstream_.next_out = urbuf_.ptr; 150 rstream_.avail_out = to!uint(urbuf_.length); 151 urpos_ = 0; 152 153 if (!readFromZlib()) { 154 // Couldn't get more data from the underlying transport. 155 return origLen - need; 156 } 157 } 158 } 159 160 override void write(in ubyte[] buf) { 161 enforce(!outputFinished_, new TTransportException( 162 "write() called after finish()", TTransportException.Type.BAD_ARGS)); 163 164 auto len = buf.length; 165 if (len > MIN_DIRECT_DEFLATE_SIZE) { 166 flushToZlib(uwbuf_[0 .. uwpos_], Z_NO_FLUSH); 167 uwpos_ = 0; 168 flushToZlib(buf, Z_NO_FLUSH); 169 } else if (len > 0) { 170 if (uwbuf_.length - uwpos_ < len) { 171 flushToZlib(uwbuf_[0 .. uwpos_], Z_NO_FLUSH); 172 uwpos_ = 0; 173 } 174 uwbuf_[uwpos_ .. uwpos_ + len] = buf[]; 175 uwpos_ += len; 176 } 177 } 178 179 override void flush() { 180 enforce(!outputFinished_, new TTransportException( 181 "flush() called after finish()", TTransportException.Type.BAD_ARGS)); 182 183 flushToTransport(Z_SYNC_FLUSH); 184 } 185 186 override const(ubyte)[] borrow(ubyte* buf, size_t len) { 187 if (len <= readAvail) { 188 return urbuf_[urpos_ .. $]; 189 } 190 return null; 191 } 192 193 override void consume(size_t len) { 194 enforce(readAvail >= len, new TTransportException( 195 "consume() did not follow a borrow().", TTransportException.Type.BAD_ARGS)); 196 urpos_ += len; 197 } 198 199 /** 200 * Finalize the zlib stream. 201 * 202 * This causes zlib to flush any pending write data and write end-of-stream 203 * information, including the checksum. Once finish() has been called, no 204 * new data can be written to the stream. 205 */ 206 void finish() { 207 enforce(!outputFinished_, new TTransportException( 208 "flush() called on already finished TZlibTransport", 209 TTransportException.Type.BAD_ARGS)); 210 flushToTransport(Z_FINISH); 211 } 212 213 /** 214 * Verify the checksum at the end of the zlib stream (by finish()). 215 * 216 * May only be called after all data has been read. 217 * 218 * Throws: TTransportException when the checksum is corrupted or there is 219 * still unread data left. 220 */ 221 void verifyChecksum() { 222 // If zlib has already reported the end of the stream, the checksum has 223 // been verified, no. 224 if (inputEnded_) return; 225 226 enforce(!readAvail, new TTransportException( 227 "verifyChecksum() called before end of zlib stream", 228 TTransportException.Type.CORRUPTED_DATA)); 229 230 rstream_.next_out = urbuf_.ptr; 231 rstream_.avail_out = to!uint(urbuf_.length); 232 urpos_ = 0; 233 234 // readFromZlib() will throw an exception if the checksum is bad. 235 enforce(readFromZlib(), new TTransportException( 236 "checksum not available yet in verifyChecksum()", 237 TTransportException.Type.CORRUPTED_DATA)); 238 239 enforce(inputEnded_, new TTransportException( 240 "verifyChecksum() called before end of zlib stream", 241 TTransportException.Type.CORRUPTED_DATA)); 242 243 // If we get here, we are at the end of the stream and thus zlib has 244 // successfully verified the checksum. 245 } 246 247 private: 248 size_t readAvail() const @property { 249 return urbuf_.length - rstream_.avail_out - urpos_; 250 } 251 252 bool readFromZlib() { 253 assert(!inputEnded_); 254 255 if (rstream_.avail_in == 0) { 256 // zlib has used up all the compressed data we provided in crbuf, read 257 // some more from the underlying transport. 258 auto got = transport_.read(crbuf_); 259 if (got == 0) return false; 260 rstream_.next_in = crbuf_.ptr; 261 rstream_.avail_in = to!uint(got); 262 } 263 264 // We have some compressed data now, uncompress it. 265 auto zlib_result = inflate(rstream_, Z_SYNC_FLUSH); 266 if (zlib_result == Z_STREAM_END) { 267 inputEnded_ = true; 268 } else { 269 zlibEnforce(zlib_result, rstream_); 270 } 271 272 return true; 273 } 274 275 void flushToTransport(int type) { 276 // Compress remaining data in uwbuf_ to cwbuf_. 277 flushToZlib(uwbuf_[0 .. uwpos_], type); 278 uwpos_ = 0; 279 280 // Write all compressed data to the transport. 281 transport_.write(cwbuf_[0 .. $ - wstream_.avail_out]); 282 wstream_.next_out = cwbuf_.ptr; 283 wstream_.avail_out = to!uint(cwbuf_.length); 284 285 // Flush the transport. 286 transport_.flush(); 287 } 288 289 void flushToZlib(in ubyte[] buf, int type) { 290 wstream_.next_in = cast(ubyte*)buf.ptr; // zlib only reads, cast is safe. 291 wstream_.avail_in = to!uint(buf.length); 292 293 while (true) { 294 if (type == Z_NO_FLUSH && wstream_.avail_in == 0) { 295 break; 296 } 297 298 if (wstream_.avail_out == 0) { 299 // cwbuf has been exhausted by zlib, flush to the underlying transport. 300 transport_.write(cwbuf_); 301 wstream_.next_out = cwbuf_.ptr; 302 wstream_.avail_out = to!uint(cwbuf_.length); 303 } 304 305 auto zlib_result = deflate(wstream_, type); 306 307 if (type == Z_FINISH && zlib_result == Z_STREAM_END) { 308 assert(wstream_.avail_in == 0); 309 outputFinished_ = true; 310 break; 311 } 312 313 zlibEnforce(zlib_result, wstream_); 314 315 if ((type == Z_SYNC_FLUSH || type == Z_FULL_FLUSH) && 316 wstream_.avail_in == 0 && wstream_.avail_out != 0) { 317 break; 318 } 319 } 320 } 321 322 static void zlibEnforce(int status, z_stream* stream) { 323 if (status != Z_OK) { 324 throw new TZlibException(status, stream.msg); 325 } 326 } 327 328 static void zlibLogError(int status, z_stream* stream) { 329 if (status != Z_OK) { 330 logError("TZlibTransport: zlib failure in destructor: %s", 331 TZlibException.errorMessage(status, stream.msg)); 332 } 333 } 334 335 // Writes smaller than this are buffered up (due to zlib handling overhead). 336 // Larger (or equal) writes are dumped straight to zlib. 337 enum MIN_DIRECT_DEFLATE_SIZE = 32; 338 339 TTransport transport_; 340 z_stream* rstream_; 341 z_stream* wstream_; 342 343 /// Whether zlib has reached the end of the input stream. 344 bool inputEnded_; 345 346 /// Whether the output stream was already finish()ed. 347 bool outputFinished_; 348 349 /// Compressed input data buffer. 350 ubyte[] crbuf_; 351 352 /// Uncompressed input data buffer. 353 ubyte[] urbuf_; 354 size_t urpos_; 355 356 /// Uncompressed output data buffer (where small writes are accumulated 357 /// before handing over to zlib). 358 ubyte[] uwbuf_; 359 size_t uwpos_; 360 361 /// Compressed output data buffer (filled by zlib, we flush it to the 362 /// underlying transport). 363 ubyte[] cwbuf_; 364 } 365 366 /** 367 * Wraps given transports into TZlibTransports. 368 */ 369 alias TWrapperTransportFactory!TZlibTransport TZlibTransportFactory; 370 371 /** 372 * An INTERNAL_ERROR-type TTransportException originating from an error 373 * signaled by zlib. 374 */ 375 class TZlibException : TTransportException { 376 this(int statusCode, const(char)* msg) { 377 super(errorMessage(statusCode, msg), TTransportException.Type.INTERNAL_ERROR); 378 zlibStatusCode = statusCode; 379 zlibMsg = msg ? to!string(msg) : "(null)"; 380 } 381 382 int zlibStatusCode; 383 string zlibMsg; 384 385 static string errorMessage(int statusCode, const(char)* msg) { 386 string result = "zlib error: "; 387 388 if (msg) { 389 result ~= to!string(msg); 390 } else { 391 result ~= "(no message)"; 392 } 393 394 result ~= " (status code = " ~ to!string(statusCode) ~ ")"; 395 return result; 396 } 397 } 398 399 version (unittest) { 400 import std.exception : collectException; 401 import thrift.transport.memory; 402 } 403 404 // Make sure basic reading/writing works. 405 unittest { 406 auto buf = new TMemoryBuffer; 407 auto zlib = new TZlibTransport(buf); 408 409 immutable ubyte[] data = [1, 2, 3, 4, 5]; 410 zlib.write(data); 411 zlib.finish(); 412 413 auto result = new ubyte[data.length]; 414 zlib.readAll(result); 415 enforce(data == result); 416 zlib.verifyChecksum(); 417 } 418 419 // Make sure there is no data is written if write() is never called. 420 unittest { 421 auto buf = new TMemoryBuffer; 422 { 423 scope zlib = new TZlibTransport(buf); 424 } 425 enforce(buf.getContents().length == 0); 426 } 427 428 // Make sure calling write()/flush()/finish() again after finish() throws. 429 unittest { 430 auto buf = new TMemoryBuffer; 431 auto zlib = new TZlibTransport(buf); 432 433 zlib.write([1, 2, 3, 4, 5]); 434 zlib.finish(); 435 436 auto ex = collectException!TTransportException(zlib.write([6])); 437 enforce(ex && ex.type == TTransportException.Type.BAD_ARGS); 438 439 ex = collectException!TTransportException(zlib.flush()); 440 enforce(ex && ex.type == TTransportException.Type.BAD_ARGS); 441 442 ex = collectException!TTransportException(zlib.finish()); 443 enforce(ex && ex.type == TTransportException.Type.BAD_ARGS); 444 } 445 446 // Make sure verifying the checksum works even if it requires starting a new 447 // reading buffer after reading the payload has already been completed. 448 unittest { 449 auto buf = new TMemoryBuffer; 450 auto zlib = new TZlibTransport(buf); 451 452 immutable ubyte[] data = [1, 2, 3, 4, 5]; 453 zlib.write(data); 454 zlib.finish(); 455 456 zlib = new TZlibTransport(buf, TZlibTransport.DEFAULT_URBUF_SIZE, 457 buf.getContents().length - 1); // The last byte belongs to the checksum. 458 459 auto result = new ubyte[data.length]; 460 zlib.readAll(result); 461 enforce(data == result); 462 463 zlib.verifyChecksum(); 464 } 465 466 // Make sure verifyChecksum() throws if we messed with the checksum. 467 unittest { 468 import std.stdio; 469 import thrift.transport.range; 470 471 auto buf = new TMemoryBuffer; 472 auto zlib = new TZlibTransport(buf); 473 474 immutable ubyte[] data = [1, 2, 3, 4, 5]; 475 zlib.write(data); 476 zlib.finish(); 477 478 void testCorrupted(const(ubyte)[] corruptedData) { 479 auto reader = new TZlibTransport(tInputRangeTransport(corruptedData)); 480 auto result = new ubyte[data.length]; 481 try { 482 reader.readAll(result); 483 484 // If it does read without complaining, the result should be correct. 485 enforce(result == data); 486 } catch (TZlibException e) {} 487 488 auto ex = collectException!TTransportException(reader.verifyChecksum()); 489 enforce(ex && ex.type == TTransportException.Type.CORRUPTED_DATA); 490 } 491 492 testCorrupted(buf.getContents()[0 .. $ - 1]); 493 494 auto modified = buf.getContents().dup; 495 ++modified[$ - 1]; 496 testCorrupted(modified); 497 }