Class: Fluent::Plugin::HttpInput::Handler
- Inherits:
-
Object
- Object
- Fluent::Plugin::HttpInput::Handler
- Defined in:
- lib/fluent/plugin/in_http.rb
Constant Summary collapse
- RES_200_STATUS =
"200 OK".freeze
- RES_403_STATUS =
"403 Forbidden".freeze
Instance Attribute Summary collapse
-
#content_type ⇒ Object
readonly
Returns the value of attribute content_type.
Instance Method Summary collapse
- #close ⇒ Object
- #closing? ⇒ Boolean
-
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose.
-
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
- #include_cors_allow_origin ⇒ Object
-
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
constructor
A new instance of Handler.
- #on_body(chunk) ⇒ Object
- #on_close ⇒ Object
- #on_headers_complete(headers) ⇒ Object
- #on_message_begin ⇒ Object
- #on_message_complete ⇒ Object
- #on_read(data) ⇒ Object
- #on_write_complete ⇒ Object
- #send_response(code, header, body) ⇒ Object
- #send_response_and_close(code, header, body) ⇒ Object
- #send_response_nobody(code, header) ⇒ Object
- #step_idle ⇒ Object
Constructor Details
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
Returns a new instance of Handler.
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/fluent/plugin/in_http.rb', line 346 def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @cors_allow_credentials = cors_allow_credentials @idle = 0 @add_query_params = add_query_params @km.add(self) @remote_port, @remote_addr = io.remote_port, io.remote_addr @parser = Http::Parser.new(self) end |
Instance Attribute Details
#content_type ⇒ Object (readonly)
Returns the value of attribute content_type.
344 345 346 |
# File 'lib/fluent/plugin/in_http.rb', line 344 def content_type @content_type end |
Instance Method Details
#close ⇒ Object
589 590 591 |
# File 'lib/fluent/plugin/in_http.rb', line 589 def close @io.close end |
#closing? ⇒ Boolean
602 603 604 |
# File 'lib/fluent/plugin/in_http.rb', line 602 def closing? @next_close end |
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.
455 456 457 |
# File 'lib/fluent/plugin/in_http.rb', line 455 def handle_get_request return send_response_and_close(RES_200_STATUS, {}, "") end |
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'lib/fluent/plugin/in_http.rb', line 461 def # Is CORS enabled in the first place? if @cors_allow_origins.nil? return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' return send_response_and_close(RES_403_STATUS, {}, "") end header = { "Access-Control-Allow-Methods" => "POST", "Access-Control-Allow-Headers" => @access_control_request_headers || "", } # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end send_response_and_close(RES_200_STATUS, header, "") else send_response_and_close(RES_403_STATUS, {}, "") end end |
#include_cors_allow_origin ⇒ Object
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 |
# File 'lib/fluent/plugin/in_http.rb', line 629 def include_cors_allow_origin if @origin.nil? return false end if @cors_allow_origins.include?(@origin) return true end filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""} r = filtered_cors_allow_origins.find do |origin| (start_str, end_str) = origin.split("*", 2) @origin.start_with?(start_str) && @origin.end_with?(end_str) end !r.nil? end |
#on_body(chunk) ⇒ Object
440 441 442 443 444 445 446 447 448 |
# File 'lib/fluent/plugin/in_http.rb', line 440 def on_body(chunk) if @body.bytesize + chunk.bytesize > @body_size_limit unless closing? send_response_and_close("413 Request Entity Too Large", {}, "Too large") end return end @body << chunk end |
#on_close ⇒ Object
369 370 371 |
# File 'lib/fluent/plugin/in_http.rb', line 369 def on_close @km.delete(self) end |
#on_headers_complete(headers) ⇒ Object
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 |
# File 'lib/fluent/plugin/in_http.rb', line 386 def on_headers_complete(headers) expect = nil size = nil if @parser.http_version == [1, 1] @keep_alive = true else @keep_alive = false end @env = {} @content_type = "" @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.tr('-','_').upcase}"] = v case k when /\AExpect\z/i expect = v when /\AContent-Length\Z/i size = v.to_i when /\AContent-Type\Z/i @content_type = v when /\AContent-Encoding\Z/i @content_encoding = v when /\AConnection\Z/i if /close/i.match?(v) @keep_alive = false elsif /Keep-alive/i.match?(v) @keep_alive = true end when /\AOrigin\Z/i @origin = v when /\AX-Forwarded-For\Z/i # For multiple X-Forwarded-For headers. Use first header value. v = v.first if v.is_a?(Array) @remote_addr = v.split(",").first when /\AAccess-Control-Request-Method\Z/i @access_control_request_method = v when /\AAccess-Control-Request-Headers\Z/i @access_control_request_headers = v end } if expect if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else send_response_and_close("413 Request Entity Too Large", {}, "Too large") end else send_response_and_close("417 Expectation Failed", {}, "") end end end |
#on_message_begin ⇒ Object
382 383 384 |
# File 'lib/fluent/plugin/in_http.rb', line 382 def @body = '' end |
#on_message_complete ⇒ Object
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 |
# File 'lib/fluent/plugin/in_http.rb', line 492 def return if closing? if @parser.http_method == 'GET'.freeze return handle_get_request() end if @parser.http_method == 'OPTIONS'.freeze return () end # CORS check # ========== # For every incoming request, we check if we have some CORS # restrictions and allow listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') || include_cors_allow_origin send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end # Content Encoding # ================= # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s send_response_and_close(RES_400_STATUS, {}, "") return end @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif /^application\/x-www-form-urlencoded/.match?(@content_type) params.update WEBrick::HTTPUtils.parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1) params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary) elsif /^application\/json/.match?(@content_type) params['json'] = @body elsif /^application\/csp-report/.match?(@content_type) params['json'] = @body elsif /^application\/msgpack/.match?(@content_type) params['msgpack'] = @body elsif /^application\/x-ndjson/.match?(@content_type) params['ndjson'] = @body end path_info = uri.path if (@add_query_params) query_params = WEBrick::HTTPUtils.parse_query(uri.query) query_params.each_pair {|k,v| params["QUERY_#{k.tr('-','_').upcase}"] = v } end params.merge!(@env) @env.clear code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') header['Access-Control-Allow-Origin'] = '*' elsif include_cors_allow_origin header['Access-Control-Allow-Origin'] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end end end if @keep_alive header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) end end |
#on_read(data) ⇒ Object
373 374 375 376 377 378 379 380 |
# File 'lib/fluent/plugin/in_http.rb', line 373 def on_read(data) @idle = 0 @parser << data rescue @log.warn "unexpected error", error: $!.to_s @log.warn_backtrace @io.close end |
#on_write_complete ⇒ Object
593 594 595 |
# File 'lib/fluent/plugin/in_http.rb', line 593 def on_write_complete @io.close if @next_close end |
#send_response(code, header, body) ⇒ Object
606 607 608 609 610 611 612 613 614 615 616 617 618 |
# File 'lib/fluent/plugin/in_http.rb', line 606 def send_response(code, header, body) header['Content-Length'] ||= body.bytesize header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) @io.write(body) end |
#send_response_and_close(code, header, body) ⇒ Object
597 598 599 600 |
# File 'lib/fluent/plugin/in_http.rb', line 597 def send_response_and_close(code, header, body) send_response(code, header, body) @next_close = true end |
#send_response_nobody(code, header) ⇒ Object
620 621 622 623 624 625 626 627 |
# File 'lib/fluent/plugin/in_http.rb', line 620 def send_response_nobody(code, header) data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) end |
#step_idle ⇒ Object
365 366 367 |
# File 'lib/fluent/plugin/in_http.rb', line 365 def step_idle @idle += 1 end |