1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.transport.memory; 20 21 import core.exception : onOutOfMemoryError; 22 import core.stdc.stdlib : free, realloc; 23 import std.algorithm : min; 24 import std.conv : text; 25 import thrift.transport.base; 26 27 /** 28 * A transport that simply reads from and writes to an in-memory buffer. Every 29 * time you call write on it, the data is simply placed into a buffer, and 30 * every time you call read, data is consumed from that buffer. 31 * 32 * Currently, the storage for written data is never reclaimed, even if the 33 * buffer contents have already been read out again. 34 */ 35 final class TMemoryBuffer : TBaseTransport { 36 /** 37 * Constructs a new memory transport with an empty internal buffer. 38 */ 39 this() {} 40 41 /** 42 * Constructs a new memory transport with an empty internal buffer, 43 * reserving space for capacity bytes in advance. 44 * 45 * If the amount of data which will be written to the buffer is already 46 * known on construction, this can better performance over the default 47 * constructor because reallocations can be avoided. 48 * 49 * If the preallocated buffer is exhausted, data can still be written to the 50 * transport, but reallocations will happen. 51 * 52 * Params: 53 * capacity = Size of the initially reserved buffer (in bytes). 54 */ 55 this(size_t capacity) { 56 reset(capacity); 57 } 58 59 /** 60 * Constructs a new memory transport initially containing the passed data. 61 * 62 * For now, the passed buffer is not intelligently used, the data is just 63 * copied to the internal buffer. 64 * 65 * Params: 66 * buffer = Initial contents available to be read. 67 */ 68 this(in ubyte[] contents) { 69 auto size = contents.length; 70 reset(size); 71 buffer_[0 .. size] = contents[]; 72 writeOffset_ = size; 73 } 74 75 /** 76 * Destructor, frees the internally allocated buffer. 77 */ 78 ~this() { 79 free(buffer_); 80 } 81 82 /** 83 * Returns a read-only view of the current buffer contents. 84 * 85 * Note: For performance reasons, the returned slice is only valid for the 86 * life of this object, and may be invalidated on the next write() call at 87 * will – you might want to immediately .dup it if you intend to keep it 88 * around. 89 */ 90 const(ubyte)[] getContents() { 91 return buffer_[readOffset_ .. writeOffset_]; 92 } 93 94 /** 95 * A memory transport is always open. 96 */ 97 override bool isOpen() @property { 98 return true; 99 } 100 101 override bool peek() { 102 return writeOffset_ - readOffset_ > 0; 103 } 104 105 /** 106 * Opening is a no-op() for a memory buffer. 107 */ 108 override void open() {} 109 110 /** 111 * Closing is a no-op() for a memory buffer, it is always open. 112 */ 113 override void close() {} 114 115 override size_t read(ubyte[] buf) { 116 auto size = min(buf.length, writeOffset_ - readOffset_); 117 buf[0 .. size] = buffer_[readOffset_ .. readOffset_ + size]; 118 readOffset_ += size; 119 return size; 120 } 121 122 /** 123 * Shortcut version of readAll() – using this over TBaseTransport.readAll() 124 * can give us a nice speed increase because gives us a nice speed increase 125 * because it is typically a very hot path during deserialization. 126 */ 127 override void readAll(ubyte[] buf) { 128 auto available = writeOffset_ - readOffset_; 129 if (buf.length > available) { 130 throw new TTransportException(text("Cannot readAll() ", buf.length, 131 " bytes of data because only ", available, " bytes are available."), 132 TTransportException.Type.END_OF_FILE); 133 } 134 135 buf[] = buffer_[readOffset_ .. readOffset_ + buf.length]; 136 readOffset_ += buf.length; 137 } 138 139 override void write(in ubyte[] buf) { 140 auto need = buf.length; 141 if (bufferLen_ - writeOffset_ < need) { 142 // Exponential growth. 143 auto newLen = bufferLen_ + 1; 144 while (newLen - writeOffset_ < need) newLen *= 2; 145 cRealloc(buffer_, newLen); 146 bufferLen_ = newLen; 147 } 148 149 buffer_[writeOffset_ .. writeOffset_ + need] = buf[]; 150 writeOffset_ += need; 151 } 152 153 override const(ubyte)[] borrow(ubyte* buf, size_t len) { 154 if (len <= writeOffset_ - readOffset_) { 155 return buffer_[readOffset_ .. writeOffset_]; 156 } else { 157 return null; 158 } 159 } 160 161 override void consume(size_t len) { 162 readOffset_ += len; 163 } 164 165 void reset() { 166 readOffset_ = 0; 167 writeOffset_ = 0; 168 } 169 170 void reset(size_t capacity) { 171 readOffset_ = 0; 172 writeOffset_ = 0; 173 if (bufferLen_ < capacity) { 174 cRealloc(buffer_, capacity); 175 bufferLen_ = capacity; 176 } 177 } 178 179 private: 180 ubyte* buffer_; 181 size_t bufferLen_; 182 size_t readOffset_; 183 size_t writeOffset_; 184 } 185 186 private { 187 void cRealloc(ref ubyte* data, size_t newSize) { 188 auto result = realloc(data, newSize); 189 if (result is null) onOutOfMemoryError(); 190 data = cast(ubyte*)result; 191 } 192 } 193 194 version (unittest) { 195 import std.exception; 196 } 197 198 unittest { 199 auto a = new TMemoryBuffer(5); 200 immutable(ubyte[]) testData = [1, 2, 3, 4]; 201 auto buf = new ubyte[testData.length]; 202 enforce(a.isOpen); 203 204 // a should be empty. 205 enforce(!a.peek()); 206 enforce(a.read(buf) == 0); 207 assertThrown!TTransportException(a.readAll(buf)); 208 209 // Write some data and read it back again. 210 a.write(testData); 211 enforce(a.peek()); 212 enforce(a.getContents() == testData); 213 enforce(a.read(buf) == testData.length); 214 enforce(buf == testData); 215 216 // a should be empty again. 217 enforce(!a.peek()); 218 enforce(a.read(buf) == 0); 219 assertThrown!TTransportException(a.readAll(buf)); 220 221 // Test the constructor which directly accepts initial data. 222 auto b = new TMemoryBuffer(testData); 223 enforce(b.isOpen); 224 enforce(b.peek()); 225 enforce(b.getContents() == testData); 226 227 // Test borrow(). 228 auto borrowed = b.borrow(null, testData.length); 229 enforce(borrowed == testData); 230 enforce(b.peek()); 231 b.consume(testData.length); 232 enforce(!b.peek()); 233 }