1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */19 modulethrift.transport.buffered;
20 21 importstd.algorithm : min;
22 importstd.array : empty;
23 importstd.exception : enforce;
24 importthrift.transport.base;
25 26 /**
27 * Wraps another transport and buffers reads and writes until the internal
28 * buffers are exhausted, at which point new data is fetched resp. the
29 * accumulated data is written out at once.
30 */31 finalclassTBufferedTransport : TBaseTransport {
32 /**
33 * Constructs a new instance, using the default buffer sizes.
34 *
35 * Params:
36 * transport = The underlying transport to wrap.
37 */38 this(TTransporttransport) {
39 this(transport, DEFAULT_BUFFER_SIZE);
40 }
41 42 /**
43 * Constructs a new instance, using the specified buffer size.
44 *
45 * Params:
46 * transport = The underlying transport to wrap.
47 * bufferSize = The size of the read and write buffers to use, in bytes.
48 */49 this(TTransporttransport, size_tbufferSize) {
50 this(transport, bufferSize, bufferSize);
51 }
52 53 /**
54 * Constructs a new instance, using the specified buffer size.
55 *
56 * Params:
57 * transport = The underlying transport to wrap.
58 * readBufferSize = The size of the read buffer to use, in bytes.
59 * writeBufferSize = The size of the write buffer to use, in bytes.
60 */61 this(TTransporttransport, size_treadBufferSize, size_twriteBufferSize) {
62 transport_ = transport;
63 readBuffer_ = newubyte[readBufferSize];
64 writeBuffer_ = newubyte[writeBufferSize];
65 writeAvail_ = writeBuffer_;
66 }
67 68 /// The default size of the read/write buffers, in bytes.69 enumintDEFAULT_BUFFER_SIZE = 512;
70 71 overrideboolisOpen() @property {
72 returntransport_.isOpen();
73 }
74 75 overrideboolpeek() {
76 if (readAvail_.empty) {
77 // If there is nothing available to read, see if we can get something78 // from the underlying transport.79 autobytesRead = transport_.read(readBuffer_);
80 readAvail_ = readBuffer_[0 .. bytesRead];
81 }
82 83 return !readAvail_.empty;
84 }
85 86 overridevoidopen() {
87 transport_.open();
88 }
89 90 overridevoidclose() {
91 if (!isOpen) return;
92 flush();
93 transport_.close();
94 }
95 96 overridesize_tread(ubyte[] buf) {
97 if (readAvail_.empty) {
98 // No data left in our buffer, fetch some from the underlying transport.99 100 if (buf.length > readBuffer_.length) {
101 // If the amount of data requested is larger than our reading buffer,102 // directly read to the passed buffer. This probably doesn't occur too103 // often in practice (and even if it does, the underlying transport104 // probably cannot fulfill the request at once anyway), but it can't105 // harm to try…106 returntransport_.read(buf);
107 }
108 109 autobytesRead = transport_.read(readBuffer_);
110 readAvail_ = readBuffer_[0 .. bytesRead];
111 }
112 113 // Hand over whatever we have.114 autogive = min(readAvail_.length, buf.length);
115 buf[0 .. give] = readAvail_[0 .. give];
116 readAvail_ = readAvail_[give .. $];
117 returngive;
118 }
119 120 /**
121 * Shortcut version of readAll.
122 */123 overridevoidreadAll(ubyte[] buf) {
124 if (readAvail_.length >= buf.length) {
125 buf[] = readAvail_[0 .. buf.length];
126 readAvail_ = readAvail_[buf.length .. $];
127 return;
128 }
129 130 super.readAll(buf);
131 }
132 133 overridevoidwrite(inubyte[] buf) {
134 if (writeAvail_.length >= buf.length) {
135 // If the data fits in the buffer, just save it there.136 writeAvail_[0 .. buf.length] = buf;
137 writeAvail_ = writeAvail_[buf.length .. $];
138 return;
139 }
140 141 // We have to decide if we copy data from buf to our internal buffer, or142 // just directly write them out. The same considerations about avoiding143 // syscalls as for C++ apply here.144 autobytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
145 if ((bytesAvail + buf.length >= 2 * writeBuffer_.length) || (bytesAvail == 0)) {
146 // We would immediately need two syscalls anyway (or we don't have147 // anything) in our buffer to write, so just write out both buffers.148 if (bytesAvail > 0) {
149 transport_.write(writeBuffer_[0 .. bytesAvail]);
150 writeAvail_ = writeBuffer_;
151 }
152 153 transport_.write(buf);
154 return;
155 }
156 157 // Fill up our internal buffer for a write.158 writeAvail_[] = buf[0 .. writeAvail_.length];
159 autoleft = buf[writeAvail_.length .. $];
160 transport_.write(writeBuffer_);
161 162 // Copy the rest into our buffer.163 writeBuffer_[0 .. left.length] = left[];
164 writeAvail_ = writeBuffer_[left.length .. $];
165 }
166 167 overridevoidflush() {
168 // Write out any data waiting in the write buffer.169 autobytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
170 if (bytesAvail > 0) {
171 // Note that we reset writeAvail_ prior to calling the underlying protocol172 // to make sure the buffer is cleared even if the transport throws an173 // exception.174 writeAvail_ = writeBuffer_;
175 transport_.write(writeBuffer_[0 .. bytesAvail]);
176 }
177 178 // Flush the underlying transport.179 transport_.flush();
180 }
181 182 overrideconst(ubyte)[] borrow(ubyte* buf, size_tlen) {
183 if (len <= readAvail_.length) {
184 returnreadAvail_;
185 }
186 returnnull;
187 }
188 189 overridevoidconsume(size_tlen) {
190 enforce(len <= readBuffer_.length, newTTransportException(
191 "Invalid consume length.", TTransportException.Type.BAD_ARGS));
192 readAvail_ = readAvail_[len .. $];
193 }
194 195 /**
196 * The wrapped transport.
197 */198 TTransportunderlyingTransport() @property {
199 returntransport_;
200 }
201 202 private:
203 TTransporttransport_;
204 205 ubyte[] readBuffer_;
206 ubyte[] writeBuffer_;
207 208 ubyte[] readAvail_;
209 ubyte[] writeAvail_;
210 }
211 212 /**
213 * Wraps given transports into TBufferedTransports.
214 */215 aliasTWrapperTransportFactory!TBufferedTransportTBufferedTransportFactory;