Class: Fluent::Plugin::HttpInput::Handler
- Inherits:
-
Object
- Object
- Fluent::Plugin::HttpInput::Handler
- Defined in:
- lib/fluent/plugin/in_http.rb
Constant Summary collapse
- RES_200_STATUS =
"200 OK".freeze
- RES_403_STATUS =
"403 Forbidden".freeze
Instance Attribute Summary collapse
-
#content_type ⇒ Object
readonly
Returns the value of attribute content_type.
Instance Method Summary collapse
- #close ⇒ Object
- #closing? ⇒ Boolean
-
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose.
-
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
- #include_cors_allow_origin ⇒ Object
-
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
constructor
A new instance of Handler.
- #on_body(chunk) ⇒ Object
- #on_close ⇒ Object
- #on_headers_complete(headers) ⇒ Object
- #on_message_begin ⇒ Object
- #on_message_complete ⇒ Object
- #on_read(data) ⇒ Object
- #on_write_complete ⇒ Object
- #send_response(code, header, body) ⇒ Object
- #send_response_and_close(code, header, body) ⇒ Object
- #send_response_nobody(code, header) ⇒ Object
- #step_idle ⇒ Object
Constructor Details
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
Returns a new instance of Handler.
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/fluent/plugin/in_http.rb', line 378 def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @cors_allow_credentials = cors_allow_credentials @idle = 0 @add_query_params = add_query_params @km.add(self) @remote_port, @remote_addr = io.remote_port, io.remote_addr @parser = Http::Parser.new(self) end |
Instance Attribute Details
#content_type ⇒ Object (readonly)
Returns the value of attribute content_type.
376 377 378 |
# File 'lib/fluent/plugin/in_http.rb', line 376 def content_type @content_type end |
Instance Method Details
#close ⇒ Object
619 620 621 |
# File 'lib/fluent/plugin/in_http.rb', line 619 def close @io.close end |
#closing? ⇒ Boolean
632 633 634 |
# File 'lib/fluent/plugin/in_http.rb', line 632 def closing? @next_close end |
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.
487 488 489 |
# File 'lib/fluent/plugin/in_http.rb', line 487 def handle_get_request return send_response_and_close(RES_200_STATUS, {}, "") end |
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 |
# File 'lib/fluent/plugin/in_http.rb', line 493 def # Is CORS enabled in the first place? if @cors_allow_origins.nil? return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' return send_response_and_close(RES_403_STATUS, {}, "") end header = { "Access-Control-Allow-Methods" => "POST", "Access-Control-Allow-Headers" => @access_control_request_headers || "", } # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end send_response_and_close(RES_200_STATUS, header, "") else send_response_and_close(RES_403_STATUS, {}, "") end end |
#include_cors_allow_origin ⇒ Object
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 |
# File 'lib/fluent/plugin/in_http.rb', line 659 def include_cors_allow_origin if @origin.nil? return false end if @cors_allow_origins.include?(@origin) return true end filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""} r = filtered_cors_allow_origins.find do |origin| (start_str, end_str) = origin.split("*", 2) @origin.start_with?(start_str) && @origin.end_with?(end_str) end !r.nil? end |
#on_body(chunk) ⇒ Object
472 473 474 475 476 477 478 479 480 |
# File 'lib/fluent/plugin/in_http.rb', line 472 def on_body(chunk) if @body.bytesize + chunk.bytesize > @body_size_limit unless closing? send_response_and_close("413 Request Entity Too Large", {}, "Too large") end return end @body << chunk end |
#on_close ⇒ Object
401 402 403 |
# File 'lib/fluent/plugin/in_http.rb', line 401 def on_close @km.delete(self) end |
#on_headers_complete(headers) ⇒ Object
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 |
# File 'lib/fluent/plugin/in_http.rb', line 418 def on_headers_complete(headers) expect = nil size = nil if @parser.http_version == [1, 1] @keep_alive = true else @keep_alive = false end @env = {} @content_type = "" @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.gsub('-','_').upcase}"] = v case k when /\AExpect\z/i expect = v when /\AContent-Length\Z/i size = v.to_i when /\AContent-Type\Z/i @content_type = v when /\AContent-Encoding\Z/i @content_encoding = v when /\AConnection\Z/i if v =~ /close/i @keep_alive = false elsif v =~ /Keep-alive/i @keep_alive = true end when /\AOrigin\Z/i @origin = v when /\AX-Forwarded-For\Z/i # For multiple X-Forwarded-For headers. Use first header value. v = v.first if v.is_a?(Array) @remote_addr = v.split(",").first when /\AAccess-Control-Request-Method\Z/i @access_control_request_method = v when /\AAccess-Control-Request-Headers\Z/i @access_control_request_headers = v end } if expect if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else send_response_and_close("413 Request Entity Too Large", {}, "Too large") end else send_response_and_close("417 Expectation Failed", {}, "") end end end |
#on_message_begin ⇒ Object
414 415 416 |
# File 'lib/fluent/plugin/in_http.rb', line 414 def @body = '' end |
#on_message_complete ⇒ Object
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 |
# File 'lib/fluent/plugin/in_http.rb', line 524 def return if closing? if @parser.http_method == 'GET'.freeze return handle_get_request() end if @parser.http_method == 'OPTIONS'.freeze return () end # CORS check # ========== # For every incoming request, we check if we have some CORS # restrictions and allow listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') || include_cors_allow_origin send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end # Content Encoding # ================= # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s send_response_and_close(RES_400_STATUS, {}, "") return end @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif @content_type =~ /^application\/x-www-form-urlencoded/ params.update WEBrick::HTTPUtils.parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1) params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary) elsif @content_type =~ /^application\/json/ params['json'] = @body elsif @content_type =~ /^application\/msgpack/ params['msgpack'] = @body elsif @content_type =~ /^application\/x-ndjson/ params['ndjson'] = @body end path_info = uri.path if (@add_query_params) query_params = WEBrick::HTTPUtils.parse_query(uri.query) query_params.each_pair {|k,v| params["QUERY_#{k.gsub('-','_').upcase}"] = v } end params.merge!(@env) @env.clear code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') header['Access-Control-Allow-Origin'] = '*' elsif include_cors_allow_origin header['Access-Control-Allow-Origin'] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end end end if @keep_alive header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) end end |
#on_read(data) ⇒ Object
405 406 407 408 409 410 411 412 |
# File 'lib/fluent/plugin/in_http.rb', line 405 def on_read(data) @idle = 0 @parser << data rescue @log.warn "unexpected error", error: $!.to_s @log.warn_backtrace @io.close end |
#on_write_complete ⇒ Object
623 624 625 |
# File 'lib/fluent/plugin/in_http.rb', line 623 def on_write_complete @io.close if @next_close end |
#send_response(code, header, body) ⇒ Object
636 637 638 639 640 641 642 643 644 645 646 647 648 |
# File 'lib/fluent/plugin/in_http.rb', line 636 def send_response(code, header, body) header['Content-Length'] ||= body.bytesize header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) @io.write(body) end |
#send_response_and_close(code, header, body) ⇒ Object
627 628 629 630 |
# File 'lib/fluent/plugin/in_http.rb', line 627 def send_response_and_close(code, header, body) send_response(code, header, body) @next_close = true end |
#send_response_nobody(code, header) ⇒ Object
650 651 652 653 654 655 656 657 |
# File 'lib/fluent/plugin/in_http.rb', line 650 def send_response_nobody(code, header) data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) end |
#step_idle ⇒ Object
397 398 399 |
# File 'lib/fluent/plugin/in_http.rb', line 397 def step_idle @idle += 1 end |