Class: Fluent::Plugin::ForwardOutput
- Inherits:
-
Output
show all
- Defined in:
- lib/fluent/plugin/out_forward.rb,
lib/fluent/plugin/out_forward/error.rb,
lib/fluent/plugin/out_forward/ack_handler.rb,
lib/fluent/plugin/out_forward/socket_cache.rb,
lib/fluent/plugin/out_forward/load_balancer.rb,
lib/fluent/plugin/out_forward/failure_detector.rb,
lib/fluent/plugin/out_forward/connection_manager.rb,
lib/fluent/plugin/out_forward/handshake_protocol.rb
Defined Under Namespace
Classes: AckHandler, ConnectionClosedError, ConnectionManager, Error, FailureDetector, HandshakeError, HandshakeProtocol, HeloError, LoadBalancer, NoNodesAvailable, Node, NoneHeartbeatNode, PingpongError, SocketCache
Constant Summary
collapse
- LISTEN_PORT =
24224
[0x93].pack('C').freeze
Constants inherited
from Output
Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
Attributes inherited from Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count
#log
Attributes inherited from Base
#under_plugin_development
Instance Method Summary
collapse
Methods inherited from Output
#acts_as_secondary, #after_start, #backup_chunk, #calculate_timekey, #check_slow_flush, #chunk_for_test, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #process, #retry_state, #rollback_write, #shutdown, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #terminate, #try_rollback_all, #try_rollback_write, #update_retry_state, #write_guard
#dump_unique_id_hex, #generate_unique_id
included
included, #terminate
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Base
#after_shutdown?, #after_start, #after_started?, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminate, #terminated?
#system_config, #system_config_override
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
Returns a new instance of ForwardOutput.
160
161
162
163
164
165
166
167
168
169
170
|
# File 'lib/fluent/plugin/out_forward.rb', line 160
def initialize
super
@nodes = [] @loop = nil
@thread = nil
@usock = nil
@keep_alive_watcher_interval = 5 @suspend_flush = false
end
|
Instance Attribute Details
#nodes ⇒ Object
Returns the value of attribute nodes.
149
150
151
|
# File 'lib/fluent/plugin/out_forward.rb', line 149
def nodes
@nodes
end
|
#read_interval ⇒ Object
Returns the value of attribute read_interval.
158
159
160
|
# File 'lib/fluent/plugin/out_forward.rb', line 158
def read_interval
@read_interval
end
|
#recover_sample_size ⇒ Object
Returns the value of attribute recover_sample_size.
158
159
160
|
# File 'lib/fluent/plugin/out_forward.rb', line 158
def recover_sample_size
@recover_sample_size
end
|
Instance Method Details
#after_shutdown ⇒ Object
358
359
360
361
|
# File 'lib/fluent/plugin/out_forward.rb', line 358
def after_shutdown
last_ack if @require_ack_response
super
end
|
#before_shutdown ⇒ Object
353
354
355
356
|
# File 'lib/fluent/plugin/out_forward.rb', line 353
def before_shutdown
super
@suspend_flush = true
end
|
#close ⇒ Object
336
337
338
339
340
341
342
343
|
# File 'lib/fluent/plugin/out_forward.rb', line 336
def close
if @usock
@usock.close rescue nil
end
super
end
|
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
|
# File 'lib/fluent/plugin/out_forward.rb', line 172
def configure(conf)
compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')
super
unless @chunk_key_tag
raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
end
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval
if @heartbeat_type == :tcp
log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
@heartbeat_type = :transport
end
if @dns_round_robin && @heartbeat_type == :udp
raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
end
if @transport == :tls
if @tls_cert_path && !@tls_cert_path.empty?
@tls_ca_cert_path = @tls_cert_path
end
if @tls_ca_cert_path && !@tls_ca_cert_path.empty?
@tls_ca_cert_path.each do |path|
raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
end
end
if @tls_insecure_mode
log.warn "TLS transport is configured in insecure way"
@tls_verify_hostname = false
@tls_allow_self_signed_cert = true
end
if Fluent.windows?
if (@tls_cert_path || @tls_ca_cert_path) && @tls_cert_logical_store_name
raise Fluent::ConfigError, "specified both cert path and tls_cert_logical_store_name is not permitted"
end
else
raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_logical_store_name
raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_thumbprint
end
end
@ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)
configs = []
conf.elements(name: 'server').each do |s|
s.name = 'service'
end
unless conf.elements(name: 'service').empty?
new_elem = Fluent::Config::Element.new('static_service_discovery', {}, {}, conf.elements(name: 'service'))
configs << { type: :static, conf: new_elem }
end
conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end
service_discovery_create_manager(
:out_forward_service_discovery_watcher,
configurations: configs,
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)
discovery_manager.services.each do |server|
@nodes << server
unless @heartbeat_type == :none
begin
server.validate_host_resolution!
rescue => e
raise unless @ignore_network_errors_at_startup
log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
server.disable!
end
end
end
unless @as_secondary
if @compress == :gzip && @buffer.compress == :text
@buffer.compress = :gzip
elsif @compress == :text && @buffer.compress == :gzip
log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
end
end
if discovery_manager.services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end
if !@keepalive && @keepalive_timeout
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
end
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end
|
#create_transfer_socket(host, port, hostname, &block) ⇒ Object
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
|
# File 'lib/fluent/plugin/out_forward.rb', line 391
def create_transfer_socket(host, port, hostname, &block)
case @transport
when :tls
socket_create_tls(
host, port,
version: @tls_version,
ciphers: @tls_ciphers,
insecure: @tls_insecure_mode,
verify_fqdn: @tls_verify_hostname,
fqdn: hostname,
allow_self_signed_cert: @tls_allow_self_signed_cert,
cert_paths: @tls_ca_cert_path,
cert_path: @tls_client_cert_path,
private_key_path: @tls_client_private_key_path,
private_key_passphrase: @tls_client_private_key_passphrase,
cert_thumbprint: @tls_cert_thumbprint,
cert_logical_store_name: @tls_cert_logical_store_name,
cert_use_enterprise_store: @tls_cert_use_enterprise_store,
linger_timeout: Fluent.windows? ? nil : @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
when :tcp
socket_create_tcp(
host, port,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
connect_timeout: @connect_timeout,
&block
)
else
raise "BUG: unknown transport protocol #{@transport}"
end
end
|
454
455
456
|
# File 'lib/fluent/plugin/out_forward.rb', line 454
def
FORWARD_HEADER
end
|
#last_ack ⇒ Object
368
369
370
371
|
# File 'lib/fluent/plugin/out_forward.rb', line 368
def last_ack
overwrite_delayed_commit_timeout
ack_check(ack_select_interval)
end
|
#multi_workers_ready? ⇒ Boolean
287
288
289
|
# File 'lib/fluent/plugin/out_forward.rb', line 287
def multi_workers_ready?
true
end
|
#overwrite_delayed_commit_timeout ⇒ Object
295
296
297
298
299
300
301
302
|
# File 'lib/fluent/plugin/out_forward.rb', line 295
def overwrite_delayed_commit_timeout
if @delayed_commit_timeout != @ack_response_timeout
log.info "delayed_commit_timeout is overwritten by ack_response_timeout"
@delayed_commit_timeout = @ack_response_timeout + 2 end
end
|
#prefer_delayed_commit ⇒ Object
291
292
293
|
# File 'lib/fluent/plugin/out_forward.rb', line 291
def prefer_delayed_commit
@require_ack_response
end
|
#start ⇒ Object
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
|
# File 'lib/fluent/plugin/out_forward.rb', line 304
def start
super
unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
end
if @require_ack_response
overwrite_delayed_commit_timeout
thread_create(:out_forward_receiving_ack, &method(:ack_reader))
end
if @verify_connection_at_startup
discovery_manager.services.each do |node|
begin
node.verify_connection
rescue StandardError => e
log.fatal "forward's connection setting error: #{e.message}"
raise Fluent::UnrecoverableError, e.message
end
end
end
if @keepalive
timer_execute(:out_forward_keep_alived_socket_watcher, @keep_alive_watcher_interval, &method(:on_purge_obsolete_socks))
end
end
|
#statistics ⇒ Object
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
|
# File 'lib/fluent/plugin/out_forward.rb', line 435
def statistics
stats = super
services = discovery_manager.services
healthy_nodes_count = 0
registed_nodes_count = services.size
services.each do |s|
if s.available?
healthy_nodes_count += 1
end
end
stats.merge(
'healthy_nodes_count' => healthy_nodes_count,
'registered_nodes_count' => registed_nodes_count,
)
end
|
#stop ⇒ Object
345
346
347
348
349
350
351
|
# File 'lib/fluent/plugin/out_forward.rb', line 345
def stop
super
if @keepalive
@connection_manager.stop
end
end
|
#try_flush ⇒ Object
363
364
365
366
|
# File 'lib/fluent/plugin/out_forward.rb', line 363
def try_flush
return if @require_ack_response && @suspend_flush
super
end
|
#try_write(chunk) ⇒ Object
380
381
382
383
384
385
386
387
388
389
|
# File 'lib/fluent/plugin/out_forward.rb', line 380
def try_write(chunk)
log.trace "writing a chunk to destination", chunk_id: dump_unique_id_hex(chunk.unique_id)
if chunk.empty?
commit_write(chunk.unique_id)
return
end
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
last_ack if @require_ack_response && @suspend_flush
end
|
#write(chunk) ⇒ Object
373
374
375
376
377
378
|
# File 'lib/fluent/plugin/out_forward.rb', line 373
def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
end
|