Class: Fluent::Plugin::ForwardOutput
- Inherits:
-
Output
show all
- Defined in:
- lib/fluent/plugin/out_forward.rb,
lib/fluent/plugin/out_forward/error.rb,
lib/fluent/plugin/out_forward/ack_handler.rb,
lib/fluent/plugin/out_forward/socket_cache.rb,
lib/fluent/plugin/out_forward/load_balancer.rb,
lib/fluent/plugin/out_forward/failure_detector.rb,
lib/fluent/plugin/out_forward/connection_manager.rb,
lib/fluent/plugin/out_forward/handshake_protocol.rb
Defined Under Namespace
Classes: AckHandler, ConnectionClosedError, ConnectionManager, Error, FailureDetector, HandshakeError, HandshakeProtocol, HeloError, LoadBalancer, NoNodesAvailable, Node, NoneHeartbeatNode, PingpongError, SocketCache
Constant Summary
collapse
- LISTEN_PORT =
24224
[0x93].pack('C').freeze
Constants inherited
from Output
Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
Attributes inherited from Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone
#log
Attributes inherited from Base
#under_plugin_development
Instance Method Summary
collapse
Methods inherited from Output
#acts_as_secondary, #actual_flush_thread_count, #after_start, #backup_chunk, #calculate_timekey, #check_slow_flush, #chunk_for_test, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #keep_buffer_config_compat, #log_retry_error, #metadata, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #synchronize_in_threads, #synchronize_path, #synchronize_path_in_workers, #terminate, #try_rollback_all, #try_rollback_write, #update_retry_state, #write_count, #write_guard
#dump_unique_id_hex, #generate_unique_id
included
included, #terminate
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Base
#acquire_worker_lock, #after_shutdown?, #after_start, #after_started?, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminate, #terminated?
#system_config, #system_config_override
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
Returns a new instance of ForwardOutput.
160
161
162
163
164
165
166
167
168
169
170
171
172
|
# File 'lib/fluent/plugin/out_forward.rb', line 160
def initialize
super
@nodes = [] @loop = nil
@thread = nil
@usock = nil
@keep_alive_watcher_interval = 5 @suspend_flush = false
@healthy_nodes_count_metrics = nil
@registered_nodes_count_metrics = nil
end
|
Instance Attribute Details
#nodes ⇒ Object
Returns the value of attribute nodes.
149
150
151
|
# File 'lib/fluent/plugin/out_forward.rb', line 149
def nodes
@nodes
end
|
#read_interval ⇒ Object
Returns the value of attribute read_interval.
158
159
160
|
# File 'lib/fluent/plugin/out_forward.rb', line 158
def read_interval
@read_interval
end
|
#recover_sample_size ⇒ Object
Returns the value of attribute recover_sample_size.
158
159
160
|
# File 'lib/fluent/plugin/out_forward.rb', line 158
def recover_sample_size
@recover_sample_size
end
|
Instance Method Details
#after_shutdown ⇒ Object
346
347
348
349
|
# File 'lib/fluent/plugin/out_forward.rb', line 346
def after_shutdown
last_ack if @require_ack_response
super
end
|
#before_shutdown ⇒ Object
341
342
343
344
|
# File 'lib/fluent/plugin/out_forward.rb', line 341
def before_shutdown
super
@suspend_flush = true
end
|
#close ⇒ Object
324
325
326
327
328
329
330
331
|
# File 'lib/fluent/plugin/out_forward.rb', line 324
def close
if @usock
@usock.close rescue nil
end
super
end
|
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
# File 'lib/fluent/plugin/out_forward.rb', line 174
def configure(conf)
compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')
super
unless @chunk_key_tag
raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
end
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval
if @heartbeat_type == :tcp
log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
@heartbeat_type = :transport
end
if @dns_round_robin && @heartbeat_type == :udp
raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
end
if @transport == :tls
if @tls_cert_path && !@tls_cert_path.empty?
@tls_ca_cert_path = @tls_cert_path
end
if @tls_ca_cert_path && !@tls_ca_cert_path.empty?
@tls_ca_cert_path.each do |path|
raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
end
end
if @tls_insecure_mode
log.warn "TLS transport is configured in insecure way"
@tls_verify_hostname = false
@tls_allow_self_signed_cert = true
end
if Fluent.windows?
if (@tls_cert_path || @tls_ca_cert_path) && @tls_cert_logical_store_name
raise Fluent::ConfigError, "specified both cert path and tls_cert_logical_store_name is not permitted"
end
else
raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_logical_store_name
raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_thumbprint
end
end
@ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)
service_discovery_configure(
:out_forward_service_discovery_watcher,
static_default_service_directive: 'server',
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)
service_discovery_services.each do |server|
@nodes << server
unless @heartbeat_type == :none
begin
server.validate_host_resolution!
rescue => e
raise unless @ignore_network_errors_at_startup
log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
server.disable!
end
end
end
unless @as_secondary
if @compress == :gzip && @buffer.compress == :text
@buffer.compress = :gzip
elsif @compress == :text && @buffer.compress == :gzip
log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
end
end
if service_discovery_services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end
if !@keepalive && @keepalive_timeout
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
end
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
@healthy_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "healthy_nodes_count", help_text: "Number of count healthy nodes", prefer_gauge: true)
@registered_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "registered_nodes_count", help_text: "Number of count registered nodes", prefer_gauge: true)
end
|
#create_transfer_socket(host, port, hostname, &block) ⇒ Object
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
|
# File 'lib/fluent/plugin/out_forward.rb', line 379
def create_transfer_socket(host, port, hostname, &block)
case @transport
when :tls
socket_create_tls(
host, port,
version: @tls_version,
ciphers: @tls_ciphers,
insecure: @tls_insecure_mode,
verify_fqdn: @tls_verify_hostname,
fqdn: hostname,
allow_self_signed_cert: @tls_allow_self_signed_cert,
cert_paths: @tls_ca_cert_path,
cert_path: @tls_client_cert_path,
private_key_path: @tls_client_private_key_path,
private_key_passphrase: @tls_client_private_key_passphrase,
cert_thumbprint: @tls_cert_thumbprint,
cert_logical_store_name: @tls_cert_logical_store_name,
cert_use_enterprise_store: @tls_cert_use_enterprise_store,
linger_timeout: Fluent.windows? ? nil : @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
when :tcp
socket_create_tcp(
host, port,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
else
raise "BUG: unknown transport protocol #{@transport}"
end
end
|
445
446
447
|
# File 'lib/fluent/plugin/out_forward.rb', line 445
def
FORWARD_HEADER
end
|
#last_ack ⇒ Object
356
357
358
359
|
# File 'lib/fluent/plugin/out_forward.rb', line 356
def last_ack
overwrite_delayed_commit_timeout
ack_check(ack_select_interval)
end
|
#multi_workers_ready? ⇒ Boolean
275
276
277
|
# File 'lib/fluent/plugin/out_forward.rb', line 275
def multi_workers_ready?
true
end
|
#overwrite_delayed_commit_timeout ⇒ Object
283
284
285
286
287
288
289
290
|
# File 'lib/fluent/plugin/out_forward.rb', line 283
def overwrite_delayed_commit_timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 end
end
|
#prefer_delayed_commit ⇒ Object
279
280
281
|
# File 'lib/fluent/plugin/out_forward.rb', line 279
def prefer_delayed_commit
@require_ack_response
end
|
#start ⇒ Object
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
# File 'lib/fluent/plugin/out_forward.rb', line 292
def start
super
unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(service_discovery_services.first.host, service_discovery_services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
end
if @require_ack_response
overwrite_delayed_commit_timeout
thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end
if @verify_connection_at_startup
service_discovery_services.each do |node|
begin
node.verify_connection
rescue StandardError => e
log.fatal "forward's connection setting error: #{e.message}"
raise Fluent::UnrecoverableError, e.message
end
end
end
if @keepalive
timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
end
end
|
#statistics ⇒ Object
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
|
# File 'lib/fluent/plugin/out_forward.rb', line 423
def statistics
stats = super
services = service_discovery_services
@healthy_nodes_count_metrics.set(0)
@registered_nodes_count_metrics.set(services.size)
services.each do |s|
if s.available?
@healthy_nodes_count_metrics.inc
end
end
stats = {
'output' => stats["output"].merge({
'healthy_nodes_count' => @healthy_nodes_count_metrics.get,
'registered_nodes_count' => @registered_nodes_count_metrics.get,
})
}
stats
end
|
#stop ⇒ Object
333
334
335
336
337
338
339
|
# File 'lib/fluent/plugin/out_forward.rb', line 333
def stop
super
if @keepalive
@connection_manager.stop
end
end
|
#try_flush ⇒ Object
351
352
353
354
|
# File 'lib/fluent/plugin/out_forward.rb', line 351
def try_flush
return if @require_ack_response && @suspend_flush
super
end
|
#try_write(chunk) ⇒ Object
368
369
370
371
372
373
374
375
376
377
|
# File 'lib/fluent/plugin/out_forward.rb', line 368
def try_write(chunk)
log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
if chunk.empty?
commit_write(chunk.unique_id)
return
end
tag = chunk.metadata.tag
service_discovery_select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end
|
#write(chunk) ⇒ Object
361
362
363
364
365
366
|
# File 'lib/fluent/plugin/out_forward.rb', line 361
def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag
service_discovery_select_service { |node| node.send_data(tag, chunk) }
end
|