Class: Kafka::Consumer

Inherits:
Object
  • Object
show all
Includes:
Java::JavaLang::Runnable
Defined in:
lib/jruby-kafka/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(a_stream, a_threadNumber, a_queue, a_bool_restart_on_exception, a_sleep_ms) ⇒ Consumer

Returns a new instance of Consumer.



17
18
19
20
21
22
23
# File 'lib/jruby-kafka/consumer.rb', line 17

def initialize(a_stream, a_threadNumber, a_queue, a_bool_restart_on_exception, a_sleep_ms)
  @m_threadNumber = a_threadNumber
  @m_stream = a_stream
  @m_queue = a_queue
  @m_restart_on_exception = a_bool_restart_on_exception
  @m_sleep_ms = 1.0 / 1000.0 * Float(a_sleep_ms)
end

Instance Method Details

#runObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/jruby-kafka/consumer.rb', line 25

def run
  it = @m_stream.iterator()
  begin
    while it.hasNext()
      begin
        @m_queue << it.next().message()
      end
    end
  rescue Exception => e
    puts("#{self.class.name} caught exception: #{e.class.name}")
    puts(e.message) if e.message != ''
    if @m_restart_on_exception
      sleep(@m_sleep_ms)
      retry
    else
      raise e
    end
  end
end