Class: Telekinesis::Producer::AsyncProducerWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/telekinesis/producer/async_producer_worker.rb

Constant Summary collapse

SHUTDOWN =
:shutdown

Instance Method Summary collapse

Constructor Details

#initialize(producer, queue, send_size, send_every, retries, retry_interval) ⇒ AsyncProducerWorker

Returns a new instance of AsyncProducerWorker.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/telekinesis/producer/async_producer_worker.rb', line 11

def initialize(producer, queue, send_size, send_every, retries, retry_interval)
  @producer = producer
  @queue = queue
  @send_size = send_size
  @send_every = send_every
  @retries = retries
  @retry_interval = retry_interval

  @stream = producer.stream                   # for convenience
  @client = producer.client                   # for convenience
  @failure_handler = producer.failure_handler # for convenience

  @buffer = []
  @last_poll_at = current_time_millis
  @shutdown = false
end

Instance Method Details

#runObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/telekinesis/producer/async_producer_worker.rb', line 28

def run
  loop do
    next_wait = [0, (@last_poll_at + @send_every) - current_time_millis].max
    next_item = @queue.poll(next_wait, TimeUnit::MILLISECONDS)

    if next_item == SHUTDOWN
      next_item, @shutdown = nil, true
    end

    unless next_item.nil?
      buffer(next_item)
    end

    if buffer_full || (next_item.nil? && buffer_has_records)
      put_records(get_and_reset_buffer, @retries, @retry_interval)
    end

    @last_poll_at = current_time_millis
    break if @shutdown
  end
rescue => e
  # TODO: is there a way to encourage people to set up an uncaught exception
  # hanlder and/or disable this?
  bt = e.backtrace ? e.backtrace.map{|l| "!  #{l}"}.join("\n") : ""
  warn "Producer background thread died!"
  warn "#{e.class}: #{e.message}\n#{bt}"
  raise e
end