Class: Fluent::Plugin::HttpForwardOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::HttpForwardOutput
- Defined in:
- lib/fluent/plugin/out_http_forward.rb
Instance Method Summary collapse
-
#configure(config) ⇒ NilClass
Initialize attributes and parameters.
-
#configure_params(config) ⇒ NilClass
Configure plugin parameters.
-
#execute_chunking(tag, es, enqueue: false) ⇒ NilClass
Enforce the usage of the MsgPack streaming method internally so as to ensure that we have a consistent buffering mechanism.
-
#format_body(tag, time, chunk) ⇒ String
Format the Body for a given request depending on the Serializing format.
-
#format_headers(tag, time, data) ⇒ Hash
Format the Headers for a given request.
-
#format_url(tag, time, data) ⇒ String
Format the URL for a given request, and optionally replace the tag placerholder in the URI.
-
#initialize ⇒ NilClass
constructor
Initialize new output plugin.
-
#shutdown ⇒ NilClass
Tear down the plugin.
-
#start ⇒ NilClass
Prepare the plugin event loop.
-
#start_connection ⇒ HTTP::Client
Prepare the HTTP client object which provides a baseline for future request objects.
-
#try_write(chunk) ⇒ Nil
Process a chunk of records asynchronously, committing successful uploads and re-queueing failures.
-
#upload(tag, time, content) ⇒ Response
Upload the request body to the remote end point.
-
#write(chunk) ⇒ Nil
Process a chunk of records synchronously, committing successful uploads and re-queueing failures.
Constructor Details
#initialize ⇒ NilClass
Initialize new output plugin
38 39 40 41 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 38 def initialize super require 'http' end |
Instance Method Details
#configure(config) ⇒ NilClass
Initialize attributes and parameters
46 47 48 49 50 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 46 def configure(config) super configure_params(config) end |
#configure_params(config) ⇒ NilClass
Configure plugin parameters
55 56 57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 55 def configure_params(config) unless config.key?("content_type") @content_type = case @format['@type'] when 'json' then 'application/json' when 'msgpack' then 'application/x-msgpack' else nil end end end |
#execute_chunking(tag, es, enqueue: false) ⇒ NilClass
Enforce the usage of the MsgPack streaming method internally so as to ensure that we have a consistent buffering mechanism.
110 111 112 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 110 def execute_chunking(tag, es, enqueue: false) return handle_stream_with_standard_format(tag, es, enqueue: enqueue) end |
#format_body(tag, time, chunk) ⇒ String
Format the Body for a given request depending on the Serializing format.
133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 133 def format_body(tag, time, chunk) case @format["@type"] when "json" chunk.extend Fluent::PluginMixin::ChunkJsonStreamer serializer_proc = chunk.method(:to_json_stream) else serializer_proc = chunk.method(:to_msgpack_stream) end body = serializer_proc.call end |
#format_headers(tag, time, data) ⇒ Hash
Format the Headers for a given request.
125 126 127 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 125 def format_headers(tag, time, data) @headers end |
#format_url(tag, time, data) ⇒ String
Format the URL for a given request, and optionally replace the tag placerholder in the URI.
118 119 120 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 118 def format_url(tag, time, data) @url.sub(/%{tag}/, tag) end |
#shutdown ⇒ NilClass
Tear down the plugin
101 102 103 104 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 101 def shutdown @connection.close super end |
#start ⇒ NilClass
Prepare the plugin event loop
68 69 70 71 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 68 def start super start_connection end |
#start_connection ⇒ HTTP::Client
Prepare the HTTP client object which provides a baseline for future request objects.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 77 def start_connection @connection = HTTP::Client.new( :headers => @headers ) if @authentication['method'] @connection = @connection.basic_auth( :user => @authentication['username'], :pass => @authentication['password'] ) end if @content_type @connection = @connection.headers( 'Content-Type' => @content_type ) end @connection end |
#try_write(chunk) ⇒ Nil
Process a chunk of records asynchronously, committing successful uploads and re-queueing failures.
176 177 178 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 176 def try_write(chunk) write(chunk) end |
#upload(tag, time, content) ⇒ Response
Upload the request body to the remote end point
148 149 150 151 152 153 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 148 def upload(tag, time, content) headers = format_headers(tag, nil, content) url = format_url(tag, nil, content) @connection.request(@verb, url, body: content, headers: headers) end |
#write(chunk) ⇒ Nil
Process a chunk of records synchronously, committing successful uploads and re-queueing failures.
159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/fluent/plugin/out_http_forward.rb', line 159 def write(chunk) tag = chunk..tag content = format_body(tag, nil, chunk) response = upload(tag, nil, content) if response.code <= 299 commit_write(chunk.unique_id) else @log.warn "failed to flush buffer", code: response.code end end |