1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 /** 21 * HTTP tranpsort implementation, modelled after the C++ one. 22 * 23 * Unfortunately, libcurl is quite heavyweight and supports only client-side 24 * applications. This is an implementation of the basic HTTP/1.1 parts 25 * supporting HTTP 100 Continue, chunked transfer encoding, keepalive, etc. 26 */ 27 module thrift.transport.http; 28 29 import std.algorithm : canFind, countUntil, endsWith, findSplit, min, startsWith; 30 import std.ascii : toLower; 31 import std.array : empty; 32 import std.conv : parse, to; 33 import std.datetime : Clock, UTC; 34 import std..string : stripLeft; 35 import thrift.base : VERSION; 36 import thrift.transport.base; 37 import thrift.transport.memory; 38 import thrift.transport.socket; 39 40 /** 41 * Base class for both client- and server-side HTTP transports. 42 */ 43 abstract class THttpTransport : TBaseTransport { 44 this(TTransport transport) { 45 transport_ = transport; 46 readHeaders_ = true; 47 httpBuf_ = new ubyte[HTTP_BUFFER_SIZE]; 48 httpBufRemaining_ = httpBuf_[0 .. 0]; 49 readBuffer_ = new TMemoryBuffer; 50 writeBuffer_ = new TMemoryBuffer; 51 } 52 53 override bool isOpen() { 54 return transport_.isOpen(); 55 } 56 57 override bool peek() { 58 return transport_.peek(); 59 } 60 61 override void open() { 62 transport_.open(); 63 } 64 65 override void close() { 66 transport_.close(); 67 } 68 69 override size_t read(ubyte[] buf) { 70 if (!readBuffer_.peek()) { 71 readBuffer_.reset(); 72 73 if (!refill()) return 0; 74 75 if (readHeaders_) { 76 readHeaders(); 77 } 78 79 size_t got; 80 if (chunked_) { 81 got = readChunked(); 82 } else { 83 got = readContent(contentLength_); 84 } 85 readHeaders_ = true; 86 87 if (got == 0) return 0; 88 } 89 return readBuffer_.read(buf); 90 } 91 92 override size_t readEnd() { 93 // Read any pending chunked data (footers etc.) 94 if (chunked_) { 95 while (!chunkedDone_) { 96 readChunked(); 97 } 98 } 99 return 0; 100 } 101 102 override void write(in ubyte[] buf) { 103 writeBuffer_.write(buf); 104 } 105 106 override void flush() { 107 auto data = writeBuffer_.getContents(); 108 string header = getHeader(data.length); 109 110 transport_.write(cast(const(ubyte)[]) header); 111 transport_.write(data); 112 transport_.flush(); 113 114 // Reset the buffer and header variables. 115 writeBuffer_.reset(); 116 readHeaders_ = true; 117 } 118 119 /** 120 * The size of the buffer to read HTTP requests into, in bytes. Will expand 121 * as required. 122 */ 123 enum HTTP_BUFFER_SIZE = 1024; 124 125 protected: 126 abstract string getHeader(size_t dataLength); 127 abstract bool parseStatusLine(const(ubyte)[] status); 128 129 void parseHeader(const(ubyte)[] header) { 130 auto split = findSplit(header, [':']); 131 if (split[1].empty) { 132 // No colon found. 133 return; 134 } 135 136 static bool compToLower(ubyte a, ubyte b) { 137 return toLower(cast(char)a) == toLower(cast(char)b); 138 } 139 140 if (startsWith!compToLower(split[0], cast(ubyte[])"transfer-encoding")) { 141 if (endsWith!compToLower(split[2], cast(ubyte[])"chunked")) { 142 chunked_ = true; 143 } 144 } else if (startsWith!compToLower(split[0], cast(ubyte[])"content-length")) { 145 chunked_ = false; 146 auto lengthString = stripLeft(cast(const(char)[])split[2]); 147 contentLength_ = parse!size_t(lengthString); 148 } 149 } 150 151 private: 152 ubyte[] readLine() { 153 while (true) { 154 auto split = findSplit(httpBufRemaining_, cast(ubyte[])"\r\n"); 155 156 if (split[1].empty) { 157 // No CRLF yet, move whatever we have now to front and refill. 158 if (httpBufRemaining_.empty) { 159 httpBufRemaining_ = httpBuf_[0 .. 0]; 160 } else { 161 httpBuf_[0 .. httpBufRemaining_.length] = httpBufRemaining_; 162 httpBufRemaining_ = httpBuf_[0 .. httpBufRemaining_.length]; 163 } 164 165 if (!refill()) { 166 auto buf = httpBufRemaining_; 167 httpBufRemaining_ = httpBufRemaining_[$ - 1 .. $ - 1]; 168 return buf; 169 } 170 } else { 171 // Set the remaining buffer to the part after \r\n and return the part 172 // (line) before it. 173 httpBufRemaining_ = split[2]; 174 return split[0]; 175 } 176 } 177 } 178 179 void readHeaders() { 180 // Initialize headers state variables 181 contentLength_ = 0; 182 chunked_ = false; 183 chunkedDone_ = false; 184 chunkSize_ = 0; 185 186 // Control state flow 187 bool statusLine = true; 188 bool finished; 189 190 // Loop until headers are finished 191 while (true) { 192 auto line = readLine(); 193 194 if (line.length == 0) { 195 if (finished) { 196 readHeaders_ = false; 197 return; 198 } else { 199 // Must have been an HTTP 100, keep going for another status line 200 statusLine = true; 201 } 202 } else { 203 if (statusLine) { 204 statusLine = false; 205 finished = parseStatusLine(line); 206 } else { 207 parseHeader(line); 208 } 209 } 210 } 211 } 212 213 size_t readChunked() { 214 size_t length; 215 216 auto line = readLine(); 217 size_t chunkSize; 218 try { 219 auto charLine = cast(char[])line; 220 chunkSize = parse!size_t(charLine, 16); 221 } catch (Exception e) { 222 throw new TTransportException("Invalid chunk size: " ~ to!string(line), 223 TTransportException.Type.CORRUPTED_DATA); 224 } 225 226 if (chunkSize == 0) { 227 readChunkedFooters(); 228 } else { 229 // Read data content 230 length += readContent(chunkSize); 231 // Read trailing CRLF after content 232 readLine(); 233 } 234 return length; 235 } 236 237 void readChunkedFooters() { 238 while (true) { 239 auto line = readLine(); 240 if (line.length == 0) { 241 chunkedDone_ = true; 242 break; 243 } 244 } 245 } 246 247 size_t readContent(size_t size) { 248 auto need = size; 249 while (need > 0) { 250 if (httpBufRemaining_.length == 0) { 251 // We have given all the data, reset position to head of the buffer. 252 httpBufRemaining_ = httpBuf_[0 .. 0]; 253 if (!refill()) return size - need; 254 } 255 256 auto give = min(httpBufRemaining_.length, need); 257 readBuffer_.write(cast(ubyte[])httpBufRemaining_[0 .. give]); 258 httpBufRemaining_ = httpBufRemaining_[give .. $]; 259 need -= give; 260 } 261 return size; 262 } 263 264 bool refill() { 265 // Is there a nicer way to do this? 266 auto indexBegin = httpBufRemaining_.ptr - httpBuf_.ptr; 267 auto indexEnd = indexBegin + httpBufRemaining_.length; 268 269 if (httpBuf_.length - indexEnd <= (httpBuf_.length / 4)) { 270 httpBuf_.length *= 2; 271 } 272 273 // Read more data. 274 auto got = transport_.read(cast(ubyte[])httpBuf_[indexEnd .. $]); 275 if (got == 0) return false; 276 httpBufRemaining_ = httpBuf_[indexBegin .. indexEnd + got]; 277 return true; 278 } 279 280 TTransport transport_; 281 282 TMemoryBuffer writeBuffer_; 283 TMemoryBuffer readBuffer_; 284 285 bool readHeaders_; 286 bool chunked_; 287 bool chunkedDone_; 288 size_t chunkSize_; 289 size_t contentLength_; 290 291 ubyte[] httpBuf_; 292 ubyte[] httpBufRemaining_; 293 } 294 295 /** 296 * HTTP client transport. 297 */ 298 final class TClientHttpTransport : THttpTransport { 299 /** 300 * Constructs a client http transport operating on the passed underlying 301 * transport. 302 * 303 * Params: 304 * transport = The underlying transport used for the actual I/O. 305 * host = The HTTP host string. 306 * path = The HTTP path string. 307 */ 308 this(TTransport transport, string host, string path) { 309 super(transport); 310 host_ = host; 311 path_ = path; 312 } 313 314 /** 315 * Convenience overload for constructing a client HTTP transport using a 316 * TSocket connecting to the specified host and port. 317 * 318 * Params: 319 * host = The server to connect to, also used as HTTP host string. 320 * port = The port to connect to. 321 * path = The HTTP path string. 322 */ 323 this(string host, ushort port, string path) { 324 this(new TSocket(host, port), host, path); 325 } 326 327 protected: 328 override string getHeader(size_t dataLength) { 329 return "POST " ~ path_ ~ " HTTP/1.1\r\n" ~ 330 "Host: " ~ host_ ~ "\r\n" ~ 331 "Content-Type: application/x-thrift\r\n" ~ 332 "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~ 333 "Accept: application/x-thrift\r\n" ~ 334 "User-Agent: Thrift/" ~ VERSION ~ " (D/TClientHttpTransport)\r\n" ~ 335 "\r\n"; 336 } 337 338 override bool parseStatusLine(const(ubyte)[] status) { 339 // HTTP-Version SP Status-Code SP Reason-Phrase CRLF 340 auto firstSplit = findSplit(status, [' ']); 341 if (firstSplit[1].empty) { 342 throw new TTransportException("Bad status: " ~ to!string(status), 343 TTransportException.Type.CORRUPTED_DATA); 344 } 345 346 auto codeReason = firstSplit[2][countUntil!"a != b"(firstSplit[2], ' ') .. $]; 347 auto secondSplit = findSplit(codeReason, [' ']); 348 if (secondSplit[1].empty) { 349 throw new TTransportException("Bad status: " ~ to!string(status), 350 TTransportException.Type.CORRUPTED_DATA); 351 } 352 353 if (secondSplit[0] == "200") { 354 // HTTP 200 = OK, we got the response 355 return true; 356 } else if (secondSplit[0] == "100") { 357 // HTTP 100 = continue, just keep reading 358 return false; 359 } 360 361 throw new TTransportException("Bad status (unhandled status code): " ~ 362 to!string(cast(const(char[]))status), TTransportException.Type.CORRUPTED_DATA); 363 } 364 365 private: 366 string host_; 367 string path_; 368 } 369 370 /** 371 * HTTP server transport. 372 */ 373 final class TServerHttpTransport : THttpTransport { 374 /** 375 * Constructs a new instance. 376 * 377 * Param: 378 * transport = The underlying transport used for the actual I/O. 379 */ 380 this(TTransport transport) { 381 super(transport); 382 } 383 384 protected: 385 override string getHeader(size_t dataLength) { 386 return "HTTP/1.1 200 OK\r\n" ~ 387 "Date: " ~ getRFC1123Time() ~ "\r\n" ~ 388 "Server: Thrift/" ~ VERSION ~ "\r\n" ~ 389 "Content-Type: application/x-thrift\r\n" ~ 390 "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~ 391 "Connection: Keep-Alive\r\n" ~ 392 "\r\n"; 393 } 394 395 override bool parseStatusLine(const(ubyte)[] status) { 396 // Method SP Request-URI SP HTTP-Version CRLF. 397 auto split = findSplit(status, [' ']); 398 if (split[1].empty) { 399 throw new TTransportException("Bad status: " ~ to!string(status), 400 TTransportException.Type.CORRUPTED_DATA); 401 } 402 403 auto uriVersion = split[2][countUntil!"a != b"(split[2], ' ') .. $]; 404 if (!canFind(uriVersion, ' ')) { 405 throw new TTransportException("Bad status: " ~ to!string(status), 406 TTransportException.Type.CORRUPTED_DATA); 407 } 408 409 if (split[0] == "POST") { 410 // POST method ok, looking for content. 411 return true; 412 } 413 414 throw new TTransportException("Bad status (unsupported method): " ~ 415 to!string(status), TTransportException.Type.CORRUPTED_DATA); 416 } 417 } 418 419 /** 420 * Wraps a transport into a HTTP server protocol. 421 */ 422 alias TWrapperTransportFactory!TServerHttpTransport TServerHttpTransportFactory; 423 424 private { 425 import std..string : format; 426 string getRFC1123Time() { 427 auto sysTime = Clock.currTime(UTC()); 428 429 auto dayName = capMemberName(sysTime.dayOfWeek); 430 auto monthName = capMemberName(sysTime.month); 431 432 return format("%s, %s %s %s %s:%s:%s GMT", dayName, sysTime.day, 433 monthName, sysTime.year, sysTime.hour, sysTime.minute, sysTime.second); 434 } 435 436 import std.ascii : toUpper; 437 import std.traits : EnumMembers; 438 string capMemberName(T)(T val) if (is(T == enum)) { 439 foreach (i, e; EnumMembers!T) { 440 enum name = __traits(derivedMembers, T)[i]; 441 enum capName = cast(char) toUpper(name[0]) ~ name [1 .. $]; 442 if (val == e) { 443 return capName; 444 } 445 } 446 throw new Exception("Not a member of " ~ T.stringof ~ ": " ~ to!string(val)); 447 } 448 449 unittest { 450 enum Foo { 451 bar, 452 bAZ 453 } 454 455 import std.exception; 456 enforce(capMemberName(Foo.bar) == "Bar"); 457 enforce(capMemberName(Foo.bAZ) == "BAZ"); 458 } 459 }