1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 /**
21 * HTTP tranpsort implementation, modelled after the C++ one.
22 *
23 * Unfortunately, libcurl is quite heavyweight and supports only client-side
24 * applications. This is an implementation of the basic HTTP/1.1 parts
25 * supporting HTTP 100 Continue, chunked transfer encoding, keepalive, etc.
26 */
27 module thrift.transport.http;
28
29 import std.algorithm : canFind, countUntil, endsWith, findSplit, min, startsWith;
30 import std.ascii : toLower;
31 import std.array : empty;
32 import std.conv : parse, to;
33 import std.datetime : Clock, UTC;
34 import std.string : stripLeft;
35 import thrift.base : VERSION;
36 import thrift.transport.base;
37 import thrift.transport.memory;
38 import thrift.transport.socket;
39
40 /**
41 * Base class for both client- and server-side HTTP transports.
42 */
43 abstract class THttpTransport : TBaseTransport {
44 this(TTransport transport) {
45 transport_ = transport;
46 readHeaders_ = true;
47 httpBuf_ = new ubyte[HTTP_BUFFER_SIZE];
48 httpBufRemaining_ = httpBuf_[0 .. 0];
49 readBuffer_ = new TMemoryBuffer;
50 writeBuffer_ = new TMemoryBuffer;
51 }
52
53 override bool isOpen() {
54 return transport_.isOpen();
55 }
56
57 override bool peek() {
58 return transport_.peek();
59 }
60
61 override void open() {
62 transport_.open();
63 }
64
65 override void close() {
66 transport_.close();
67 }
68
69 override size_t read(ubyte[] buf) {
70 if (!readBuffer_.peek()) {
71 readBuffer_.reset();
72
73 if (!refill()) return 0;
74
75 if (readHeaders_) {
76 readHeaders();
77 }
78
79 size_t got;
80 if (chunked_) {
81 got = readChunked();
82 } else {
83 got = readContent(contentLength_);
84 }
85 readHeaders_ = true;
86
87 if (got == 0) return 0;
88 }
89 return readBuffer_.read(buf);
90 }
91
92 override size_t readEnd() {
93 // Read any pending chunked data (footers etc.)
94 if (chunked_) {
95 while (!chunkedDone_) {
96 readChunked();
97 }
98 }
99 return 0;
100 }
101
102 override void write(in ubyte[] buf) {
103 writeBuffer_.write(buf);
104 }
105
106 override void flush() {
107 auto data = writeBuffer_.getContents();
108 string header = getHeader(data.length);
109
110 transport_.write(cast(const(ubyte)[]) header);
111 transport_.write(data);
112 transport_.flush();
113
114 // Reset the buffer and header variables.
115 writeBuffer_.reset();
116 readHeaders_ = true;
117 }
118
119 /**
120 * The size of the buffer to read HTTP requests into, in bytes. Will expand
121 * as required.
122 */
123 enum HTTP_BUFFER_SIZE = 1024;
124
125 protected:
126 abstract string getHeader(size_t dataLength);
127 abstract bool parseStatusLine(const(ubyte)[] status);
128
129 void parseHeader(const(ubyte)[] header) {
130 auto split = findSplit(header, [':']);
131 if (split[1].empty) {
132 // No colon found.
133 return;
134 }
135
136 static bool compToLower(ubyte a, ubyte b) {
137 return toLower(cast(char)a) == toLower(cast(char)b);
138 }
139
140 if (startsWith!compToLower(split[0], cast(ubyte[])"transfer-encoding")) {
141 if (endsWith!compToLower(split[2], cast(ubyte[])"chunked")) {
142 chunked_ = true;
143 }
144 } else if (startsWith!compToLower(split[0], cast(ubyte[])"content-length")) {
145 chunked_ = false;
146 auto lengthString = stripLeft(cast(const(char)[])split[2]);
147 contentLength_ = parse!size_t(lengthString);
148 }
149 }
150
151 private:
152 ubyte[] readLine() {
153 while (true) {
154 auto split = findSplit(httpBufRemaining_, cast(ubyte[])"\r\n");
155
156 if (split[1].empty) {
157 // No CRLF yet, move whatever we have now to front and refill.
158 if (httpBufRemaining_.empty) {
159 httpBufRemaining_ = httpBuf_[0 .. 0];
160 } else {
161 httpBuf_[0 .. httpBufRemaining_.length] = httpBufRemaining_;
162 httpBufRemaining_ = httpBuf_[0 .. httpBufRemaining_.length];
163 }
164
165 if (!refill()) {
166 auto buf = httpBufRemaining_;
167 httpBufRemaining_ = httpBufRemaining_[$ - 1 .. $ - 1];
168 return buf;
169 }
170 } else {
171 // Set the remaining buffer to the part after \r\n and return the part
172 // (line) before it.
173 httpBufRemaining_ = split[2];
174 return split[0];
175 }
176 }
177 }
178
179 void readHeaders() {
180 // Initialize headers state variables
181 contentLength_ = 0;
182 chunked_ = false;
183 chunkedDone_ = false;
184 chunkSize_ = 0;
185
186 // Control state flow
187 bool statusLine = true;
188 bool finished;
189
190 // Loop until headers are finished
191 while (true) {
192 auto line = readLine();
193
194 if (line.length == 0) {
195 if (finished) {
196 readHeaders_ = false;
197 return;
198 } else {
199 // Must have been an HTTP 100, keep going for another status line
200 statusLine = true;
201 }
202 } else {
203 if (statusLine) {
204 statusLine = false;
205 finished = parseStatusLine(line);
206 } else {
207 parseHeader(line);
208 }
209 }
210 }
211 }
212
213 size_t readChunked() {
214 size_t length;
215
216 auto line = readLine();
217 size_t chunkSize;
218 try {
219 auto charLine = cast(char[])line;
220 chunkSize = parse!size_t(charLine, 16);
221 } catch (Exception e) {
222 throw new TTransportException("Invalid chunk size: " ~ to!string(line),
223 TTransportException.Type.CORRUPTED_DATA);
224 }
225
226 if (chunkSize == 0) {
227 readChunkedFooters();
228 } else {
229 // Read data content
230 length += readContent(chunkSize);
231 // Read trailing CRLF after content
232 readLine();
233 }
234 return length;
235 }
236
237 void readChunkedFooters() {
238 while (true) {
239 auto line = readLine();
240 if (line.length == 0) {
241 chunkedDone_ = true;
242 break;
243 }
244 }
245 }
246
247 size_t readContent(size_t size) {
248 auto need = size;
249 while (need > 0) {
250 if (httpBufRemaining_.length == 0) {
251 // We have given all the data, reset position to head of the buffer.
252 httpBufRemaining_ = httpBuf_[0 .. 0];
253 if (!refill()) return size - need;
254 }
255
256 auto give = min(httpBufRemaining_.length, need);
257 readBuffer_.write(cast(ubyte[])httpBufRemaining_[0 .. give]);
258 httpBufRemaining_ = httpBufRemaining_[give .. $];
259 need -= give;
260 }
261 return size;
262 }
263
264 bool refill() {
265 // Is there a nicer way to do this?
266 auto indexBegin = httpBufRemaining_.ptr - httpBuf_.ptr;
267 auto indexEnd = indexBegin + httpBufRemaining_.length;
268
269 if (httpBuf_.length - indexEnd <= (httpBuf_.length / 4)) {
270 httpBuf_.length *= 2;
271 }
272
273 // Read more data.
274 auto got = transport_.read(cast(ubyte[])httpBuf_[indexEnd .. $]);
275 if (got == 0) return false;
276 httpBufRemaining_ = httpBuf_[indexBegin .. indexEnd + got];
277 return true;
278 }
279
280 TTransport transport_;
281
282 TMemoryBuffer writeBuffer_;
283 TMemoryBuffer readBuffer_;
284
285 bool readHeaders_;
286 bool chunked_;
287 bool chunkedDone_;
288 size_t chunkSize_;
289 size_t contentLength_;
290
291 ubyte[] httpBuf_;
292 ubyte[] httpBufRemaining_;
293 }
294
295 /**
296 * HTTP client transport.
297 */
298 final class TClientHttpTransport : THttpTransport {
299 /**
300 * Constructs a client http transport operating on the passed underlying
301 * transport.
302 *
303 * Params:
304 * transport = The underlying transport used for the actual I/O.
305 * host = The HTTP host string.
306 * path = The HTTP path string.
307 */
308 this(TTransport transport, string host, string path) {
309 super(transport);
310 host_ = host;
311 path_ = path;
312 }
313
314 /**
315 * Convenience overload for constructing a client HTTP transport using a
316 * TSocket connecting to the specified host and port.
317 *
318 * Params:
319 * host = The server to connect to, also used as HTTP host string.
320 * port = The port to connect to.
321 * path = The HTTP path string.
322 */
323 this(string host, ushort port, string path) {
324 this(new TSocket(host, port), host, path);
325 }
326
327 protected:
328 override string getHeader(size_t dataLength) {
329 return "POST " ~ path_ ~ " HTTP/1.1\r\n" ~
330 "Host: " ~ host_ ~ "\r\n" ~
331 "Content-Type: application/x-thrift\r\n" ~
332 "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
333 "Accept: application/x-thrift\r\n" ~
334 "User-Agent: Thrift/" ~ VERSION ~ " (D/TClientHttpTransport)\r\n" ~
335 "\r\n";
336 }
337
338 override bool parseStatusLine(const(ubyte)[] status) {
339 // HTTP-Version SP Status-Code SP Reason-Phrase CRLF
340 auto firstSplit = findSplit(status, [' ']);
341 if (firstSplit[1].empty) {
342 throw new TTransportException("Bad status: " ~ to!string(status),
343 TTransportException.Type.CORRUPTED_DATA);
344 }
345
346 auto codeReason = firstSplit[2][countUntil!"a != b"(firstSplit[2], ' ') .. $];
347 auto secondSplit = findSplit(codeReason, [' ']);
348 if (secondSplit[1].empty) {
349 throw new TTransportException("Bad status: " ~ to!string(status),
350 TTransportException.Type.CORRUPTED_DATA);
351 }
352
353 if (secondSplit[0] == "200") {
354 // HTTP 200 = OK, we got the response
355 return true;
356 } else if (secondSplit[0] == "100") {
357 // HTTP 100 = continue, just keep reading
358 return false;
359 }
360
361 throw new TTransportException("Bad status (unhandled status code): " ~
362 to!string(cast(const(char[]))status), TTransportException.Type.CORRUPTED_DATA);
363 }
364
365 private:
366 string host_;
367 string path_;
368 }
369
370 /**
371 * HTTP server transport.
372 */
373 final class TServerHttpTransport : THttpTransport {
374 /**
375 * Constructs a new instance.
376 *
377 * Param:
378 * transport = The underlying transport used for the actual I/O.
379 */
380 this(TTransport transport) {
381 super(transport);
382 }
383
384 protected:
385 override string getHeader(size_t dataLength) {
386 return "HTTP/1.1 200 OK\r\n" ~
387 "Date: " ~ getRFC1123Time() ~ "\r\n" ~
388 "Server: Thrift/" ~ VERSION ~ "\r\n" ~
389 "Content-Type: application/x-thrift\r\n" ~
390 "Content-Length: " ~ to!string(dataLength) ~ "\r\n" ~
391 "Connection: Keep-Alive\r\n" ~
392 "\r\n";
393 }
394
395 override bool parseStatusLine(const(ubyte)[] status) {
396 // Method SP Request-URI SP HTTP-Version CRLF.
397 auto split = findSplit(status, [' ']);
398 if (split[1].empty) {
399 throw new TTransportException("Bad status: " ~ to!string(status),
400 TTransportException.Type.CORRUPTED_DATA);
401 }
402
403 auto uriVersion = split[2][countUntil!"a != b"(split[2], ' ') .. $];
404 if (!canFind(uriVersion, ' ')) {
405 throw new TTransportException("Bad status: " ~ to!string(status),
406 TTransportException.Type.CORRUPTED_DATA);
407 }
408
409 if (split[0] == "POST") {
410 // POST method ok, looking for content.
411 return true;
412 }
413
414 throw new TTransportException("Bad status (unsupported method): " ~
415 to!string(status), TTransportException.Type.CORRUPTED_DATA);
416 }
417 }
418
419 /**
420 * Wraps a transport into a HTTP server protocol.
421 */
422 alias TWrapperTransportFactory!TServerHttpTransport TServerHttpTransportFactory;
423
424 private {
425 import std.string : format;
426 string getRFC1123Time() {
427 auto sysTime = Clock.currTime(UTC());
428
429 auto dayName = capMemberName(sysTime.dayOfWeek);
430 auto monthName = capMemberName(sysTime.month);
431
432 return format("%s, %s %s %s %s:%s:%s GMT", dayName, sysTime.day,
433 monthName, sysTime.year, sysTime.hour, sysTime.minute, sysTime.second);
434 }
435
436 import std.ascii : toUpper;
437 import std.traits : EnumMembers;
438 string capMemberName(T)(T val) if (is(T == enum)) {
439 foreach (i, e; EnumMembers!T) {
440 enum name = __traits(derivedMembers, T)[i];
441 enum capName = cast(char) toUpper(name[0]) ~ name [1 .. $];
442 if (val == e) {
443 return capName;
444 }
445 }
446 throw new Exception("Not a member of " ~ T.stringof ~ ": " ~ to!string(val));
447 }
448
449 unittest {
450 enum Foo {
451 bar,
452 bAZ
453 }
454
455 import std.exception;
456 enforce(capMemberName(Foo.bar) == "Bar");
457 enforce(capMemberName(Foo.bAZ) == "BAZ");
458 }
459 }