45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 45
def topic_producer(topic, compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000, idempotent: false, transactional: false, transactional_id: nil, transactional_timeout: 60)
cluster = initialize_cluster
compressor = Compressor.new(
codec_name: compression_codec,
threshold: compression_threshold,
instrumenter: @instrumenter,
)
transaction_manager = TransactionManager.new(
cluster: cluster,
logger: @logger,
idempotent: idempotent,
transactional: transactional,
transactional_id: transactional_id,
transactional_timeout: transactional_timeout,
)
TopicProducer.new(topic,
cluster: cluster,
transaction_manager: transaction_manager,
logger: @logger,
instrumenter: @instrumenter,
compressor: compressor,
ack_timeout: ack_timeout,
required_acks: required_acks,
max_retries: max_retries,
retry_backoff: retry_backoff,
max_buffer_size: max_buffer_size,
max_buffer_bytesize: max_buffer_bytesize,
)
end
|