Class: Telekinesis::Producer::AsyncProducerWorker
- Inherits:
-
Object
- Object
- Telekinesis::Producer::AsyncProducerWorker
- Defined in:
- lib/telekinesis/producer/async_producer_worker.rb
Constant Summary collapse
- SHUTDOWN =
:shutdown
Instance Method Summary collapse
-
#initialize(producer, queue, send_size, send_every, retries, retry_interval) ⇒ AsyncProducerWorker
constructor
A new instance of AsyncProducerWorker.
- #run ⇒ Object
Constructor Details
#initialize(producer, queue, send_size, send_every, retries, retry_interval) ⇒ AsyncProducerWorker
Returns a new instance of AsyncProducerWorker.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/telekinesis/producer/async_producer_worker.rb', line 11 def initialize(producer, queue, send_size, send_every, retries, retry_interval) @producer = producer @queue = queue @send_size = send_size @send_every = send_every @retries = retries @retry_interval = retry_interval @stream = producer.stream # for convenience @client = producer.client # for convenience @failure_handler = producer.failure_handler # for convenience @buffer = [] @last_poll_at = current_time_millis @shutdown = false end |
Instance Method Details
#run ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/telekinesis/producer/async_producer_worker.rb', line 28 def run loop do next_wait = [0, (@last_poll_at + @send_every) - current_time_millis].max next_item = @queue.poll(next_wait, TimeUnit::MILLISECONDS) if next_item == SHUTDOWN next_item, @shutdown = nil, true end unless next_item.nil? buffer(next_item) end if buffer_full || (next_item.nil? && buffer_has_records) put_records(get_and_reset_buffer, @retries, @retry_interval) end @last_poll_at = current_time_millis break if @shutdown end rescue => e # TODO: is there a way to encourage people to set up an uncaught exception # hanlder and/or disable this? bt = e.backtrace ? e.backtrace.map{|l| "! #{l}"}.join("\n") : "" warn "Producer background thread died!" warn "#{e.class}: #{e.}\n#{bt}" raise e end |