Class: Fluent::Plugin::HTTPOutput
- Defined in:
- lib/fluent/plugin/out_http.rb
Defined Under Namespace
Classes: ConnectionCache, RetryableResponse
Constant Summary
Constants inherited from Output
Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #close ⇒ Object
- #configure(conf) ⇒ Object
- #connection_cache_id_for_thread ⇒ Object
- #connection_cache_id_for_thread=(id) ⇒ Object
- #connection_cache_id_thread_key ⇒ Object
- #format(tag, time, record) ⇒ Object
- #format_json_array(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
-
#initialize ⇒ HTTPOutput
constructor
A new instance of HTTPOutput.
- #multi_workers_ready? ⇒ Boolean
- #write(chunk) ⇒ Object
Methods inherited from Output
#acts_as_secondary, #actual_flush_thread_count, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #keep_buffer_config_compat, #log_retry_error, #metadata, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #start, #statistics, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #synchronize_in_threads, #synchronize_path, #synchronize_path_in_workers, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_count, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ HTTPOutput
Returns a new instance of HTTPOutput.
121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/fluent/plugin/out_http.rb', line 121 def initialize super @uri = nil @proxy_uri = nil @formatter = nil @connection_cache = [] @connection_cache_id_mutex = Mutex.new @connection_cache_next_id = 0 end |
Instance Method Details
#close ⇒ Object
133 134 135 136 137 |
# File 'lib/fluent/plugin/out_http.rb', line 133 def close super @connection_cache.each {|entry| entry.conn.finish if entry.conn&.started? } end |
#configure(conf) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/fluent/plugin/out_http.rb', line 139 def configure(conf) super @connection_cache = Array.new(actual_flush_thread_count, ConnectionCache.new("", nil)) if @reuse_connections if @retryable_response_codes.nil? log.warn('Status code 503 is going to be removed from default `retryable_response_codes` from fluentd v2. Please add it by yourself if you wish') @retryable_response_codes = [503] end @http_opt = setup_http_option @proxy_uri = URI.parse(@proxy) if @proxy @formatter = formatter_create @content_type = setup_content_type unless @content_type if @json_array if @formatter_configs.first[:@type] != "json" raise Fluent::ConfigError, "json_array option could be used with json formatter only" end define_singleton_method(:format, method(:format_json_array)) end if @auth and @auth.method == :aws_sigv4 begin require 'aws-sigv4' require 'aws-sdk-core' rescue LoadError raise Fluent::ConfigError, "The aws-sdk-core and aws-sigv4 gems are required for aws_sigv4 auth. Run: gem install aws-sdk-core -v '~> 3.191'" end raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil if @auth.aws_role_arn == nil aws_credentials = Aws::CredentialProviderChain.new.resolve else aws_credentials = Aws::AssumeRoleCredentials.new( client: Aws::STS::Client.new( region: @auth.aws_region ), role_arn: @auth.aws_role_arn, role_session_name: "fluentd" ) end @aws_signer = Aws::Sigv4::Signer.new( service: @auth.aws_service, region: @auth.aws_region, credentials_provider: aws_credentials ) end end |
#connection_cache_id_for_thread ⇒ Object
113 114 115 |
# File 'lib/fluent/plugin/out_http.rb', line 113 def connection_cache_id_for_thread Thread.current[connection_cache_id_thread_key] end |
#connection_cache_id_for_thread=(id) ⇒ Object
117 118 119 |
# File 'lib/fluent/plugin/out_http.rb', line 117 def connection_cache_id_for_thread=(id) Thread.current[connection_cache_id_thread_key] = id end |
#connection_cache_id_thread_key ⇒ Object
109 110 111 |
# File 'lib/fluent/plugin/out_http.rb', line 109 def connection_cache_id_thread_key "#{plugin_id}_connection_cache_id" end |
#format(tag, time, record) ⇒ Object
200 201 202 |
# File 'lib/fluent/plugin/out_http.rb', line 200 def format(tag, time, record) @formatter.format(tag, time, record) end |
#format_json_array(tag, time, record) ⇒ Object
204 205 206 |
# File 'lib/fluent/plugin/out_http.rb', line 204 def format_json_array(tag, time, record) @formatter.format(tag, time, record) << "," end |
#formatted_to_msgpack_binary? ⇒ Boolean
196 197 198 |
# File 'lib/fluent/plugin/out_http.rb', line 196 def formatted_to_msgpack_binary? @formatter_configs.first[:@type] == 'msgpack' end |
#multi_workers_ready? ⇒ Boolean
192 193 194 |
# File 'lib/fluent/plugin/out_http.rb', line 192 def multi_workers_ready? true end |
#write(chunk) ⇒ Object
208 209 210 211 212 213 214 215 |
# File 'lib/fluent/plugin/out_http.rb', line 208 def write(chunk) uri = parse_endpoint(chunk) req = create_request(chunk, uri) log.debug { "#{@http_method.capitalize} data to #{uri.to_s} with chunk(#{dump_unique_id_hex(chunk.unique_id)})" } send_request(uri, req) end |