1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.transport.base; 20 21 import core.stdc..string : strerror; 22 import std.conv : text; 23 import thrift.base; 24 25 /** 26 * An entity data can be read from and/or written to. 27 * 28 * A TTransport implementation may capable of either reading or writing, but 29 * not necessarily both. 30 */ 31 interface TTransport { 32 /** 33 * Whether this transport is open. 34 * 35 * If a transport is closed, it can be opened by calling open(), and vice 36 * versa for close(). 37 * 38 * While a transport should always be open when trying to read/write data, 39 * the related functions do not necessarily fail when called for a closed 40 * transport. Situations like this could occur e.g. with a wrapper 41 * transport which buffers data when the underlying transport has already 42 * been closed (possibly because the connection was abruptly closed), but 43 * there is still data left to be read in the buffers. This choice has been 44 * made to simplify transport implementations, in terms of both code 45 * complexity and runtime overhead. 46 */ 47 bool isOpen() @property; 48 49 /** 50 * Tests whether there is more data to read or if the remote side is 51 * still open. 52 * 53 * A typical use case would be a server checking if it should process 54 * another request on the transport. 55 */ 56 bool peek(); 57 58 /** 59 * Opens the transport for communications. 60 * 61 * If the transport is already open, nothing happens. 62 * 63 * Throws: TTransportException if opening fails. 64 */ 65 void open(); 66 67 /** 68 * Closes the transport. 69 * 70 * If the transport is not open, nothing happens. 71 * 72 * Throws: TTransportException if closing fails. 73 */ 74 void close(); 75 76 /** 77 * Attempts to fill the given buffer by reading data. 78 * 79 * For potentially blocking data sources (e.g. sockets), read() will only 80 * block if no data is available at all. If there is some data available, 81 * but waiting for new data to arrive would be required to fill the whole 82 * buffer, the readily available data will be immediately returned – use 83 * readAll() if you want to wait until the whole buffer is filled. 84 * 85 * Params: 86 * buf = Slice to use as buffer. 87 * 88 * Returns: How many bytes were actually read 89 * 90 * Throws: TTransportException if an error occurs. 91 */ 92 size_t read(ubyte[] buf); 93 94 /** 95 * Fills the given buffer by reading data into it, failing if not enough 96 * data is available. 97 * 98 * Params: 99 * buf = Slice to use as buffer. 100 * 101 * Throws: TTransportException if insufficient data is available or reading 102 * fails altogether. 103 */ 104 void readAll(ubyte[] buf); 105 106 /** 107 * Must be called by clients when read is completed. 108 * 109 * Implementations can choose to perform a transport-specific action, e.g. 110 * logging the request to a file. 111 * 112 * Returns: The number of bytes read if available, 0 otherwise. 113 */ 114 size_t readEnd(); 115 116 /** 117 * Writes the passed slice of data. 118 * 119 * Note: You must call flush() to ensure the data is actually written, 120 * and available to be read back in the future. Destroying a TTransport 121 * object does not automatically flush pending data – if you destroy a 122 * TTransport object with written but unflushed data, that data may be 123 * discarded. 124 * 125 * Params: 126 * buf = Slice of data to write. 127 * 128 * Throws: TTransportException if an error occurs. 129 */ 130 void write(in ubyte[] buf); 131 132 /** 133 * Must be called by clients when write is completed. 134 * 135 * Implementations can choose to perform a transport-specific action, e.g. 136 * logging the request to a file. 137 * 138 * Returns: The number of bytes written if available, 0 otherwise. 139 */ 140 size_t writeEnd(); 141 142 /** 143 * Flushes any pending data to be written. 144 * 145 * Must be called before destruction to ensure writes are actually complete, 146 * otherwise pending data may be discarded. Typically used with buffered 147 * transport mechanisms. 148 * 149 * Throws: TTransportException if an error occurs. 150 */ 151 void flush(); 152 153 /** 154 * Attempts to return a slice of <code>len</code> bytes of incoming data, 155 * possibly copied into buf, not consuming them (i.e.: a later read will 156 * return the same data). 157 * 158 * This method is meant to support protocols that need to read variable- 159 * length fields. They can attempt to borrow the maximum amount of data that 160 * they will need, then <code>consume()</code> what they actually use. Some 161 * transports will not support this method and others will fail occasionally, 162 * so protocols must be prepared to fall back to <code>read()</code> if 163 * borrow fails. 164 * 165 * The transport must be open when calling this. 166 * 167 * Params: 168 * buf = A buffer where the data can be stored if needed, or null to 169 * indicate that the caller is not supplying storage, but would like a 170 * slice of an internal buffer, if available. 171 * len = The number of bytes to borrow. 172 * 173 * Returns: If the borrow succeeds, a slice containing the borrowed data, 174 * null otherwise. The slice will be at least as long as requested, but 175 * may be longer if the returned slice points into an internal buffer 176 * rather than buf. 177 * 178 * Throws: TTransportException if an error occurs. 179 */ 180 const(ubyte)[] borrow(ubyte* buf, size_t len) out (result) { 181 // FIXME: Commented out because len gets corrupted in 182 // thrift.transport.memory borrow() unittest. 183 version(none) assert(result is null || result.length >= len, 184 "Buffer returned by borrow() too short."); 185 } 186 187 /** 188 * Remove len bytes from the transport. This must always follow a borrow 189 * of at least len bytes, and should always succeed. 190 * 191 * The transport must be open when calling this. 192 * 193 * Params: 194 * len = Number of bytes to consume. 195 * 196 * Throws: TTransportException if an error occurs. 197 */ 198 void consume(size_t len); 199 } 200 201 /** 202 * Provides basic fall-back implementations of the TTransport interface. 203 */ 204 class TBaseTransport : TTransport { 205 override bool isOpen() @property { 206 return false; 207 } 208 209 override bool peek() { 210 return isOpen; 211 } 212 213 override void open() { 214 throw new TTransportException("Cannot open TBaseTransport.", 215 TTransportException.Type.NOT_IMPLEMENTED); 216 } 217 218 override void close() { 219 throw new TTransportException("Cannot close TBaseTransport.", 220 TTransportException.Type.NOT_IMPLEMENTED); 221 } 222 223 override size_t read(ubyte[] buf) { 224 throw new TTransportException("Cannot read from a TBaseTransport.", 225 TTransportException.Type.NOT_IMPLEMENTED); 226 } 227 228 override void readAll(ubyte[] buf) { 229 size_t have; 230 while (have < buf.length) { 231 size_t get = read(buf[have..$]); 232 if (get <= 0) { 233 throw new TTransportException(text("Could not readAll() ", buf.length, 234 " bytes as no more data was available after ", have, " bytes."), 235 TTransportException.Type.END_OF_FILE); 236 } 237 have += get; 238 } 239 } 240 241 override size_t readEnd() { 242 // Do nothing by default, not needed by all implementations. 243 return 0; 244 } 245 246 override void write(in ubyte[] buf) { 247 throw new TTransportException("Cannot write to a TBaseTransport.", 248 TTransportException.Type.NOT_IMPLEMENTED); 249 } 250 251 override size_t writeEnd() { 252 // Do nothing by default, not needed by all implementations. 253 return 0; 254 } 255 256 override void flush() { 257 // Do nothing by default, not needed by all implementations. 258 } 259 260 override const(ubyte)[] borrow(ubyte* buf, size_t len) { 261 // borrow() is allowed to fail anyway, so just return null. 262 return null; 263 } 264 265 override void consume(size_t len) { 266 throw new TTransportException("Cannot consume from a TBaseTransport.", 267 TTransportException.Type.NOT_IMPLEMENTED); 268 } 269 270 protected: 271 this() {} 272 } 273 274 /** 275 * Makes a TTransport which wraps a given source transport in some way. 276 * 277 * A common use case is inside server implementations, where the raw client 278 * connections accepted from e.g. TServerSocket need to be wrapped into 279 * buffered or compressed transports. 280 */ 281 class TTransportFactory { 282 /** 283 * Default implementation does nothing, just returns the transport given. 284 */ 285 TTransport getTransport(TTransport trans) { 286 return trans; 287 } 288 } 289 290 /** 291 * Transport factory for transports which simply wrap an underlying TTransport 292 * without requiring additional configuration. 293 */ 294 class TWrapperTransportFactory(T) if ( 295 is(T : TTransport) && __traits(compiles, new T(TTransport.init)) 296 ) : TTransportFactory { 297 override T getTransport(TTransport trans) { 298 return new T(trans); 299 } 300 } 301 302 /** 303 * Transport-level exception. 304 */ 305 class TTransportException : TException { 306 /** 307 * Error codes for the various types of exceptions. 308 */ 309 enum Type { 310 UNKNOWN, /// 311 NOT_OPEN, /// 312 TIMED_OUT, /// 313 END_OF_FILE, /// 314 INTERRUPTED, /// 315 BAD_ARGS, /// 316 CORRUPTED_DATA, /// 317 INTERNAL_ERROR, /// 318 NOT_IMPLEMENTED /// 319 } 320 321 /// 322 this(Type type, string file = __FILE__, size_t line = __LINE__, Throwable next = null) { 323 static string msgForType(Type type) { 324 switch (type) { 325 case Type.UNKNOWN: return "Unknown transport exception"; 326 case Type.NOT_OPEN: return "Transport not open"; 327 case Type.TIMED_OUT: return "Timed out"; 328 case Type.END_OF_FILE: return "End of file"; 329 case Type.INTERRUPTED: return "Interrupted"; 330 case Type.BAD_ARGS: return "Invalid arguments"; 331 case Type.CORRUPTED_DATA: return "Corrupted Data"; 332 case Type.INTERNAL_ERROR: return "Internal error"; 333 case Type.NOT_IMPLEMENTED: return "Not implemented"; 334 default: return "(Invalid exception type)"; 335 } 336 } 337 this(msgForType(type), type, file, line, next); 338 } 339 340 /// 341 this(string msg, string file = __FILE__, size_t line = __LINE__, 342 Throwable next = null) 343 { 344 this(msg, Type.UNKNOWN, file, line, next); 345 } 346 347 /// 348 this(string msg, Type type, string file = __FILE__, size_t line = __LINE__, 349 Throwable next = null) 350 { 351 super(msg, file, line, next); 352 type_ = type; 353 } 354 355 /// 356 Type type() const nothrow @property { 357 return type_; 358 } 359 360 protected: 361 Type type_; 362 } 363 364 /** 365 * Meta-programming helper returning whether the passed type is a TTransport 366 * implementation. 367 */ 368 template isTTransport(T) { 369 enum isTTransport = is(T : TTransport); 370 }