Class: Fluent::Plugin::HttpInput::Handler
- Inherits:
-
Object
- Object
- Fluent::Plugin::HttpInput::Handler
- Defined in:
- lib/fluent/plugin/in_http.rb
Constant Summary collapse
- RES_200_STATUS =
"200 OK".freeze
- RES_403_STATUS =
"403 Forbidden".freeze
Instance Attribute Summary collapse
-
#content_type ⇒ Object
readonly
Returns the value of attribute content_type.
Instance Method Summary collapse
- #close ⇒ Object
- #closing? ⇒ Boolean
-
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose.
-
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
- #include_cors_allow_origin ⇒ Object
-
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
constructor
A new instance of Handler.
- #on_body(chunk) ⇒ Object
- #on_close ⇒ Object
- #on_headers_complete(headers) ⇒ Object
- #on_message_begin ⇒ Object
- #on_message_complete ⇒ Object
- #on_read(data) ⇒ Object
- #on_write_complete ⇒ Object
- #send_response(code, header, body) ⇒ Object
- #send_response_and_close(code, header, body) ⇒ Object
- #send_response_nobody(code, header) ⇒ Object
- #step_idle ⇒ Object
Constructor Details
#initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) ⇒ Handler
Returns a new instance of Handler.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/fluent/plugin/in_http.rb', line 341 def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins, cors_allow_credentials, add_query_params) @io = io @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @cors_allow_credentials = cors_allow_credentials @idle = 0 @add_query_params = add_query_params @km.add(self) @remote_port, @remote_addr = io.remote_port, io.remote_addr @parser = Http::Parser.new(self) end |
Instance Attribute Details
#content_type ⇒ Object (readonly)
Returns the value of attribute content_type.
339 340 341 |
# File 'lib/fluent/plugin/in_http.rb', line 339 def content_type @content_type end |
Instance Method Details
#close ⇒ Object
584 585 586 |
# File 'lib/fluent/plugin/in_http.rb', line 584 def close @io.close end |
#closing? ⇒ Boolean
597 598 599 |
# File 'lib/fluent/plugin/in_http.rb', line 597 def closing? @next_close end |
#handle_get_request ⇒ Object
Azure App Service sends GET requests for health checking purpose. Respond with ‘200 OK` to accommodate it.
450 451 452 |
# File 'lib/fluent/plugin/in_http.rb', line 450 def handle_get_request return send_response_and_close(RES_200_STATUS, {}, "") end |
#handle_options_request ⇒ Object
Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'lib/fluent/plugin/in_http.rb', line 456 def # Is CORS enabled in the first place? if @cors_allow_origins.nil? return send_response_and_close(RES_403_STATUS, {}, "") end # in_http does not support HTTP methods except POST if @access_control_request_method != 'POST' return send_response_and_close(RES_403_STATUS, {}, "") end header = { "Access-Control-Allow-Methods" => "POST", "Access-Control-Allow-Headers" => @access_control_request_headers || "", } # Check the origin and send back a CORS response if @cors_allow_origins.include?('*') header["Access-Control-Allow-Origin"] = "*" send_response_and_close(RES_200_STATUS, header, "") elsif include_cors_allow_origin header["Access-Control-Allow-Origin"] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end send_response_and_close(RES_200_STATUS, header, "") else send_response_and_close(RES_403_STATUS, {}, "") end end |
#include_cors_allow_origin ⇒ Object
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 |
# File 'lib/fluent/plugin/in_http.rb', line 624 def include_cors_allow_origin if @origin.nil? return false end if @cors_allow_origins.include?(@origin) return true end filtered_cors_allow_origins = @cors_allow_origins.select {|origin| origin != ""} r = filtered_cors_allow_origins.find do |origin| (start_str, end_str) = origin.split("*", 2) @origin.start_with?(start_str) && @origin.end_with?(end_str) end !r.nil? end |
#on_body(chunk) ⇒ Object
435 436 437 438 439 440 441 442 443 |
# File 'lib/fluent/plugin/in_http.rb', line 435 def on_body(chunk) if @body.bytesize + chunk.bytesize > @body_size_limit unless closing? send_response_and_close("413 Request Entity Too Large", {}, "Too large") end return end @body << chunk end |
#on_close ⇒ Object
364 365 366 |
# File 'lib/fluent/plugin/in_http.rb', line 364 def on_close @km.delete(self) end |
#on_headers_complete(headers) ⇒ Object
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/fluent/plugin/in_http.rb', line 381 def on_headers_complete(headers) expect = nil size = nil if @parser.http_version == [1, 1] @keep_alive = true else @keep_alive = false end @env = {} @content_type = "" @content_encoding = "" headers.each_pair {|k,v| @env["HTTP_#{k.tr('-','_').upcase}"] = v case k when /\AExpect\z/i expect = v when /\AContent-Length\Z/i size = v.to_i when /\AContent-Type\Z/i @content_type = v when /\AContent-Encoding\Z/i @content_encoding = v when /\AConnection\Z/i if /close/i.match?(v) @keep_alive = false elsif /Keep-alive/i.match?(v) @keep_alive = true end when /\AOrigin\Z/i @origin = v when /\AX-Forwarded-For\Z/i # For multiple X-Forwarded-For headers. Use first header value. v = v.first if v.is_a?(Array) @remote_addr = v.split(",").first when /\AAccess-Control-Request-Method\Z/i @access_control_request_method = v when /\AAccess-Control-Request-Headers\Z/i @access_control_request_headers = v end } if expect if expect == '100-continue'.freeze if !size || size < @body_size_limit send_response_nobody("100 Continue", {}) else send_response_and_close("413 Request Entity Too Large", {}, "Too large") end else send_response_and_close("417 Expectation Failed", {}, "") end end end |
#on_message_begin ⇒ Object
377 378 379 |
# File 'lib/fluent/plugin/in_http.rb', line 377 def @body = '' end |
#on_message_complete ⇒ Object
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 |
# File 'lib/fluent/plugin/in_http.rb', line 487 def return if closing? if @parser.http_method == 'GET'.freeze return handle_get_request() end if @parser.http_method == 'OPTIONS'.freeze return () end # CORS check # ========== # For every incoming request, we check if we have some CORS # restrictions and allow listed origins through @cors_allow_origins. unless @cors_allow_origins.nil? unless @cors_allow_origins.include?('*') || include_cors_allow_origin send_response_and_close(RES_403_STATUS, {'Connection' => 'close'}, "") return end end # Content Encoding # ================= # Decode payload according to the "Content-Encoding" header. # For now, we only support 'gzip' and 'deflate'. begin if @content_encoding == 'gzip'.freeze @body = Zlib::GzipReader.new(StringIO.new(@body)).read elsif @content_encoding == 'deflate'.freeze @body = Zlib::Inflate.inflate(@body) end rescue @log.warn 'fails to decode payload', error: $!.to_s send_response_and_close(RES_400_STATUS, {}, "") return end @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif /^application\/x-www-form-urlencoded/.match?(@content_type) params.update WEBrick::HTTPUtils.parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1) params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary) elsif /^application\/json/.match?(@content_type) params['json'] = @body elsif /^application\/csp-report/.match?(@content_type) params['json'] = @body elsif /^application\/msgpack/.match?(@content_type) params['msgpack'] = @body elsif /^application\/x-ndjson/.match?(@content_type) params['ndjson'] = @body end path_info = uri.path if (@add_query_params) query_params = WEBrick::HTTPUtils.parse_query(uri.query) query_params.each_pair {|k,v| params["QUERY_#{k.tr('-','_').upcase}"] = v } end params.merge!(@env) @env.clear code, header, body = @callback.call(path_info, params) body = body.to_s header = header.dup if header.frozen? unless @cors_allow_origins.nil? if @cors_allow_origins.include?('*') header['Access-Control-Allow-Origin'] = '*' elsif include_cors_allow_origin header['Access-Control-Allow-Origin'] = @origin if @cors_allow_credentials header["Access-Control-Allow-Credentials"] = true end end end if @keep_alive header['Connection'] = 'Keep-Alive'.freeze send_response(code, header, body) else send_response_and_close(code, header, body) end end |
#on_read(data) ⇒ Object
368 369 370 371 372 373 374 375 |
# File 'lib/fluent/plugin/in_http.rb', line 368 def on_read(data) @idle = 0 @parser << data rescue @log.warn "unexpected error", error: $!.to_s @log.warn_backtrace @io.close end |
#on_write_complete ⇒ Object
588 589 590 |
# File 'lib/fluent/plugin/in_http.rb', line 588 def on_write_complete @io.close if @next_close end |
#send_response(code, header, body) ⇒ Object
601 602 603 604 605 606 607 608 609 610 611 612 613 |
# File 'lib/fluent/plugin/in_http.rb', line 601 def send_response(code, header, body) header['Content-Length'] ||= body.bytesize header['Content-Type'] ||= 'text/plain'.freeze data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) @io.write(body) end |
#send_response_and_close(code, header, body) ⇒ Object
592 593 594 595 |
# File 'lib/fluent/plugin/in_http.rb', line 592 def send_response_and_close(code, header, body) send_response(code, header, body) @next_close = true end |
#send_response_nobody(code, header) ⇒ Object
615 616 617 618 619 620 621 622 |
# File 'lib/fluent/plugin/in_http.rb', line 615 def send_response_nobody(code, header) data = %[HTTP/1.1 #{code}\r\n] header.each_pair {|k,v| data << "#{k}: #{v}\r\n" } data << "\r\n".freeze @io.write(data) end |
#step_idle ⇒ Object
360 361 362 |
# File 'lib/fluent/plugin/in_http.rb', line 360 def step_idle @idle += 1 end |