Class: Fluent::Plugin::HttpInput::Handler
- Inherits:
-
Object
- Object
- Fluent::Plugin::HttpInput::Handler
- Defined in:
- lib/fluent/plugin/in_http.rb
Constant Summary collapse
- RES_200_STATUS =
"200 OK".freeze
- RES_403_STATUS =
"403 Forbidden".freeze
Instance Attribute Summary collapse
-
#content_type ⇒ Object
readonly
Returns the value of attribute content_type.
Instance Method Summary collapse
- #close ⇒ Object
- #closing? ⇒ Boolean
-
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose.
-
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
- #include_cors_allow_origin ⇒ Object
-
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, add_query_params) ⇒ Handler
constructor
A new instance of Handler.
- #on_body(chunk) ⇒ Object
- #on_close ⇒ Object
- #on_headers_complete(headers) ⇒ Object
- #on_message_begin ⇒ Object
- #on_message_complete ⇒ Object
- #on_read(data) ⇒ Object
- #on_write_complete ⇒ Object
- #send_response(code, header, body) ⇒ Object
- #send_response_and_close(code, header, body) ⇒ Object
- #send_response_nobody(code, header) ⇒ Object
- #step_idle ⇒ Object
Constructor Details
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, add_query_params) ⇒ Handler
Returns a new instance of Handler.
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/fluent/plugin/in_http.rb', line 359 def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @idle = 0 @add_query_params = add_query_params @km.add(self) @remote_port, @remote_addr = io.remote_port, io.remote_addr @parser = Http::Parser.new(self) end |
Instance Attribute Details
#content_type ⇒ Object (readonly)
Returns the value of attribute content_type.
357 358 359 |
# File 'lib/fluent/plugin/in_http.rb', line 357 def content_type @content_type end |
Instance Method Details
#close ⇒ Object
590 591 592 |
# File 'lib/fluent/plugin/in_http.rb', line 590 def close @io.close end |
#closing? ⇒ Boolean
603 604 605 |
# File 'lib/fluent/plugin/in_http.rb', line 603 def closing? @next_close end |
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.
466 467 468 |
# File 'lib/fluent/plugin/in_http.rb', line 466 def handle_get_request return send_response_and_close(RES_200_STATUS, {}, "") end |
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/fluent/plugin/in_http.rb', line 472 def # Is CORS enabled in the first place? if @cors_allow_origins.nil? return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' return send_response_and_close(RES_403_STATUS, {}, "") end header = { "Access-Control-Allow-Methods" => "POST", "Access-Control-Allow-Headers" => @access_control_request_headers || "", } # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin send_response_and_close(RES_200_STATUS, header, "") else send_response_and_close(RES_403_STATUS, {}, "") end end |
#include_cors_allow_origin ⇒ Object
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 |
# File 'lib/fluent/plugin/in_http.rb', line 630 def include_cors_allow_origin if @origin.nil? return false end if @cors_allow_origins.include?(@origin) return true end filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""} r = filtered_cors_allow_origins.find do |origin| (start_str, end_str) = origin.split("*", 2) @origin.start_with?(start_str) && @origin.end_with?(end_str) end !r.nil? end |
#on_body(chunk) ⇒ Object
451 452 453 454 455 456 457 458 459 |
# File 'lib/fluent/plugin/in_http.rb', line 451 def on_body(chunk) if @body.bytesize + chunk.bytesize > @body_size_limit unless closing? send_response_and_close("413 Request Entity Too Large", {}, "Too large") end return end @body << chunk end |
#on_close ⇒ Object
380 381 382 |
# File 'lib/fluent/plugin/in_http.rb', line 380 def on_close @km.delete(self) end |
#on_headers_complete(headers) ⇒ Object
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 |
# File 'lib/fluent/plugin/in_http.rb', line 397 def on_headers_complete(headers) expect = nil size = nil if @parser.http_version == [1, 1] @keep_alive = true else @keep_alive = false end @env = {} @content_type = "" @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.gsub('-','_').upcase}"] = v case k when /\AExpect\z/i expect = v when /\AContent-Length\Z/i size = v.to_i when /\AContent-Type\Z/i @content_type = v when /\AContent-Encoding\Z/i @content_encoding = v when /\AConnection\Z/i if v =~ /close/i @keep_alive = false elsif v =~ /Keep-alive/i @keep_alive = true end when /\AOrigin\Z/i @origin = v when /\AX-Forwarded-For\Z/i # For multiple X-Forwarded-For headers. Use first header value. v = v.first if v.is_a?(Array) @remote_addr = v.split(",").first when /\AAccess-Control-Request-Method\Z/i @access_control_request_method = v when /\AAccess-Control-Request-Headers\Z/i @access_control_request_headers = v end } if expect if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else send_response_and_close("413 Request Entity Too Large", {}, "Too large") end else send_response_and_close("417 Expectation Failed", {}, "") end end end |
#on_message_begin ⇒ Object
393 394 395 |
# File 'lib/fluent/plugin/in_http.rb', line 393 def @body = '' end |
#on_message_complete ⇒ Object
500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/fluent/plugin/in_http.rb', line 500 def return if closing? if @parser.http_method == 'GET'.freeze return handle_get_request() end if @parser.http_method == 'OPTIONS'.freeze return () end # CORS check # ========== # For every incoming request, we check if we have some CORS # restrictions and allow listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') || include_cors_allow_origin send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end # Content Encoding # ================= # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s send_response_and_close(RES_400_STATUS, {}, "") return end @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif @content_type =~ /^application\/x-www-form-urlencoded/ params.update WEBrick::HTTPUtils.parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1) params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary) elsif @content_type =~ /^application\/json/ params['json'] = @body elsif @content_type =~ /^application\/msgpack/ params['msgpack'] = @body end path_info = uri.path if (@add_query_params) query_params = WEBrick::HTTPUtils.parse_query(uri.query) query_params.each_pair {|k,v| params["QUERY_#{k.gsub('-','_').upcase}"] = v } end params.merge!(@env) @env.clear code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') header['Access-Control-Allow-Origin'] = '*' elsif include_cors_allow_origin header['Access-Control-Allow-Origin'] = @origin end end if @keep_alive header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) end end |
#on_read(data) ⇒ Object
384 385 386 387 388 389 390 391 |
# File 'lib/fluent/plugin/in_http.rb', line 384 def on_read(data) @idle = 0 @parser << data rescue @log.warn "unexpected error", error: $!.to_s @log.warn_backtrace @io.close end |
#on_write_complete ⇒ Object
594 595 596 |
# File 'lib/fluent/plugin/in_http.rb', line 594 def on_write_complete @io.close if @next_close end |
#send_response(code, header, body) ⇒ Object
607 608 609 610 611 612 613 614 615 616 617 618 619 |
# File 'lib/fluent/plugin/in_http.rb', line 607 def send_response(code, header, body) header['Content-Length'] ||= body.bytesize header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) @io.write(body) end |
#send_response_and_close(code, header, body) ⇒ Object
598 599 600 601 |
# File 'lib/fluent/plugin/in_http.rb', line 598 def send_response_and_close(code, header, body) send_response(code, header, body) @next_close = true end |
#send_response_nobody(code, header) ⇒ Object
621 622 623 624 625 626 627 628 |
# File 'lib/fluent/plugin/in_http.rb', line 621 def send_response_nobody(code, header) data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) end |
#step_idle ⇒ Object
376 377 378 |
# File 'lib/fluent/plugin/in_http.rb', line 376 def step_idle @idle += 1 end |