36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 36
def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000)
compressor = Compressor.new(
codec_name: compression_codec,
threshold: compression_threshold,
instrumenter: @instrumenter,
)
TopicProducer.new(topic,
cluster: initialize_cluster,
logger: @logger,
instrumenter: @instrumenter,
compressor: compressor,
ack_timeout: ack_timeout,
required_acks: required_acks,
max_retries: max_retries,
retry_backoff: retry_backoff,
max_buffer_size: max_buffer_size,
max_buffer_bytesize: max_buffer_bytesize,
)
end
|