Class: Kafka::AsyncProducer::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/async_producer.rb

Instance Method Summary collapse

Constructor Details

#initialize(queue:, producer:, delivery_threshold:, max_retries: -1,, retry_backoff: 0, instrumenter:, logger:) ⇒ Worker

Returns a new instance of Worker.



201
202
203
204
205
206
207
208
209
# File 'lib/kafka/async_producer.rb', line 201

def initialize(queue:, producer:, delivery_threshold:, max_retries: -1, retry_backoff: 0, instrumenter:, logger:)
  @queue = queue
  @producer = producer
  @delivery_threshold = delivery_threshold
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @instrumenter = instrumenter
  @logger = TaggedLogger.new(logger)
end

Instance Method Details

#runObject



211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/kafka/async_producer.rb', line 211

def run
  @logger.push_tags(@producer.to_s)
  @logger.info "Starting async producer in the background..."

  do_loop
rescue Exception => e
  @logger.error "Unexpected Kafka error #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
  @logger.error "Async producer crashed!"
ensure
  @producer.shutdown
  @logger.pop_tags
end