Class: Kafka::TopicProducer
- Inherits:
-
Object
- Object
- Kafka::TopicProducer
- Defined in:
- lib/fluent/plugin/kafka_producer_ext.rb
Instance Method Summary collapse
- #buffer_bytesize ⇒ Object
-
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
-
#clear_buffer ⇒ nil
Deletes all buffered messages.
- #deliver_messages ⇒ Object
-
#initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) ⇒ TopicProducer
constructor
A new instance of TopicProducer.
- #produce(value, key, partition, partition_key) ⇒ Object
-
#shutdown ⇒ nil
Closes all connections to the brokers.
Constructor Details
#initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) ⇒ TopicProducer
Returns a new instance of TopicProducer.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 59 def initialize(topic, cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @topic = topic @cluster.add_target_topics(Set.new([topic])) # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end |
Instance Method Details
#buffer_bytesize ⇒ Object
112 113 114 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 112 def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end |
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
108 109 110 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 108 def buffer_size @pending_message_queue.size + @buffer.size end |
#clear_buffer ⇒ nil
Deletes all buffered messages.
119 120 121 122 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 119 def clear_buffer @buffer.clear @pending_message_queue.clear end |
#deliver_messages ⇒ Object
98 99 100 101 102 103 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 98 def # There's no need to do anything if the buffer is empty. return if buffer_size == 0 end |
#produce(value, key, partition, partition_key) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 81 def produce(value, key, partition, partition_key) create_time = Time.now = PendingMessage.new( value, key, @topic, partition, partition_key, create_time ) @pending_message_queue.write() nil end |
#shutdown ⇒ nil
Closes all connections to the brokers.
127 128 129 |
# File 'lib/fluent/plugin/kafka_producer_ext.rb', line 127 def shutdown @cluster.disconnect end |