1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.transport.memory;
20 
21 import core.exception : onOutOfMemoryError;
22 import core.stdc.stdlib : free, realloc;
23 import std.algorithm : min;
24 import std.conv : text;
25 import thrift.transport.base;
26 
27 /**
28  * A transport that simply reads from and writes to an in-memory buffer. Every
29  * time you call write on it, the data is simply placed into a buffer, and
30  * every time you call read, data is consumed from that buffer.
31  *
32  * Currently, the storage for written data is never reclaimed, even if the
33  * buffer contents have already been read out again.
34  */
35 final class TMemoryBuffer : TBaseTransport {
36   /**
37    * Constructs a new memory transport with an empty internal buffer.
38    */
39   this() {}
40 
41   /**
42    * Constructs a new memory transport with an empty internal buffer,
43    * reserving space for capacity bytes in advance.
44    *
45    * If the amount of data which will be written to the buffer is already
46    * known on construction, this can better performance over the default
47    * constructor because reallocations can be avoided.
48    *
49    * If the preallocated buffer is exhausted, data can still be written to the
50    * transport, but reallocations will happen.
51    *
52    * Params:
53    *   capacity = Size of the initially reserved buffer (in bytes).
54    */
55   this(size_t capacity) {
56     reset(capacity);
57   }
58 
59   /**
60    * Constructs a new memory transport initially containing the passed data.
61    *
62    * For now, the passed buffer is not intelligently used, the data is just
63    * copied to the internal buffer.
64    *
65    * Params:
66    *   buffer = Initial contents available to be read.
67    */
68   this(in ubyte[] contents) {
69     auto size = contents.length;
70     reset(size);
71     buffer_[0 .. size] = contents[];
72     writeOffset_ = size;
73   }
74 
75   /**
76    * Destructor, frees the internally allocated buffer.
77    */
78   ~this() {
79     free(buffer_);
80   }
81 
82   /**
83    * Returns a read-only view of the current buffer contents.
84    *
85    * Note: For performance reasons, the returned slice is only valid for the
86    * life of this object, and may be invalidated on the next write() call at
87    * will – you might want to immediately .dup it if you intend to keep it
88    * around.
89    */
90   const(ubyte)[] getContents() {
91     return buffer_[readOffset_ .. writeOffset_];
92   }
93 
94   /**
95    * A memory transport is always open.
96    */
97   override bool isOpen() @property {
98     return true;
99   }
100 
101   override bool peek() {
102     return writeOffset_ - readOffset_ > 0;
103   }
104 
105   /**
106    * Opening is a no-op() for a memory buffer.
107    */
108   override void open() {}
109 
110   /**
111    * Closing is a no-op() for a memory buffer, it is always open.
112    */
113   override void close() {}
114 
115   override size_t read(ubyte[] buf) {
116     auto size = min(buf.length, writeOffset_ - readOffset_);
117     buf[0 .. size] = buffer_[readOffset_ .. readOffset_ + size];
118     readOffset_ += size;
119     return size;
120   }
121 
122   /**
123    * Shortcut version of readAll() – using this over TBaseTransport.readAll()
124    * can give us a nice speed increase because gives us a nice speed increase
125    * because it is typically a very hot path during deserialization.
126    */
127   override void readAll(ubyte[] buf) {
128     auto available = writeOffset_ - readOffset_;
129     if (buf.length > available) {
130       throw new TTransportException(text("Cannot readAll() ", buf.length,
131         " bytes of data because only ", available, " bytes are available."),
132         TTransportException.Type.END_OF_FILE);
133     }
134 
135     buf[] = buffer_[readOffset_ .. readOffset_ + buf.length];
136     readOffset_ += buf.length;
137   }
138 
139   override void write(in ubyte[] buf) {
140     auto need = buf.length;
141     if (bufferLen_ - writeOffset_ < need) {
142       // Exponential growth.
143       auto newLen = bufferLen_ + 1;
144       while (newLen - writeOffset_ < need) newLen *= 2;
145       cRealloc(buffer_, newLen);
146       bufferLen_ = newLen;
147     }
148 
149     buffer_[writeOffset_ .. writeOffset_ + need] = buf[];
150     writeOffset_ += need;
151   }
152 
153   override const(ubyte)[] borrow(ubyte* buf, size_t len) {
154     if (len <= writeOffset_ - readOffset_) {
155       return buffer_[readOffset_ .. writeOffset_];
156     } else {
157       return null;
158     }
159   }
160 
161   override void consume(size_t len) {
162     readOffset_ += len;
163   }
164 
165   void reset() {
166     readOffset_ = 0;
167     writeOffset_ = 0;
168   }
169 
170   void reset(size_t capacity) {
171     readOffset_ = 0;
172     writeOffset_ = 0;
173     if (bufferLen_ < capacity) {
174       cRealloc(buffer_, capacity);
175       bufferLen_ = capacity;
176     }
177   }
178 
179 private:
180   ubyte* buffer_;
181   size_t bufferLen_;
182   size_t readOffset_;
183   size_t writeOffset_;
184 }
185 
186 private {
187   void cRealloc(ref ubyte* data, size_t newSize) {
188     auto result = realloc(data, newSize);
189     if (result is null) onOutOfMemoryError();
190     data = cast(ubyte*)result;
191   }
192 }
193 
194 version (unittest) {
195   import std.exception;
196 }
197 
198 unittest {
199   auto a = new TMemoryBuffer(5);
200   immutable(ubyte[]) testData = [1, 2, 3, 4];
201   auto buf = new ubyte[testData.length];
202   enforce(a.isOpen);
203 
204   // a should be empty.
205   enforce(!a.peek());
206   enforce(a.read(buf) == 0);
207   assertThrown!TTransportException(a.readAll(buf));
208 
209   // Write some data and read it back again.
210   a.write(testData);
211   enforce(a.peek());
212   enforce(a.getContents() == testData);
213   enforce(a.read(buf) == testData.length);
214   enforce(buf == testData);
215 
216   // a should be empty again.
217   enforce(!a.peek());
218   enforce(a.read(buf) == 0);
219   assertThrown!TTransportException(a.readAll(buf));
220 
221   // Test the constructor which directly accepts initial data.
222   auto b = new TMemoryBuffer(testData);
223   enforce(b.isOpen);
224   enforce(b.peek());
225   enforce(b.getContents() == testData);
226 
227   // Test borrow().
228   auto borrowed = b.borrow(null, testData.length);
229   enforce(borrowed == testData);
230   enforce(b.peek());
231   b.consume(testData.length);
232   enforce(!b.peek());
233 }