Class: Hermann::Provider::JavaSimpleConsumer
- Inherits:
-
Object
- Object
- Hermann::Provider::JavaSimpleConsumer
- Defined in:
- lib/hermann/provider/java_simple_consumer.rb
Overview
Implements a java based consumer. The #consumer method loops infinitely, the hasNext() method blocks until a message is available.
Constant Summary collapse
- NUM_THREADS =
1
- DEFAULTS_HERMANN_OPTS =
default zookeeper connection options
{ 'zookeeper.session.timeout.ms' => '400', 'zookeeper.sync.time.ms' => '200', 'auto.commit.interval.ms' => '1000', }.freeze
- DEFAULT_CONSUMER_OPTIONS =
{ :do_retry => true, :max_retries => 3, :backoff_time_sec => 1, :logger => nil }.freeze
Instance Attribute Summary collapse
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#topic ⇒ Object
Returns the value of attribute topic.
-
#zookeeper ⇒ Object
Returns the value of attribute zookeeper.
Instance Method Summary collapse
-
#consume(topic = nil) ⇒ Object
Starts infinite loop to consume messages.
-
#initialize(zookeepers, groupId, topic, opts = {}) ⇒ JavaSimpleConsumer
constructor
Instantiate JavaSimpleConsumer.
-
#shutdown ⇒ Object
Shuts down the various threads created by createMessageStreams This can be called after the thread executing consume has exited to clean up.
Constructor Details
#initialize(zookeepers, groupId, topic, opts = {}) ⇒ JavaSimpleConsumer
Instantiate JavaSimpleConsumer
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 42 def initialize(zookeepers, groupId, topic, opts={}) @topic = topic = DEFAULT_CONSUMER_OPTIONS.merge(opts) @backoff_time_sec = .delete(:backoff_time_sec) @do_retry = .delete(:do_retry) @max_retries = .delete(:max_retries) @logger = .delete(:logger) # deleting options above so that they do not get sent to # the create_config method config = create_config(zookeepers, groupId, ) @consumer = ConsumerUtil::Consumer.createJavaConsumerConnector(config) end |
Instance Attribute Details
#consumer ⇒ Object
Returns the value of attribute consumer.
10 11 12 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10 def consumer @consumer end |
#topic ⇒ Object
Returns the value of attribute topic.
10 11 12 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10 def topic @topic end |
#zookeeper ⇒ Object
Returns the value of attribute zookeeper.
10 11 12 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 10 def zookeeper @zookeeper end |
Instance Method Details
#consume(topic = nil) ⇒ Object
Starts infinite loop to consume messages. hasNext() blocks until a message is available at which point it is yielded to the block
Examples
consumer.consume do |message|
puts "Received: #{}"
end
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 73 def consume(topic=nil) begin stream = get_stream(topic) it = stream.iterator while it.hasNext do = it.next. yield String.from_java_bytes() end rescue => e if retry? && @max_retries > 0 sleep @backoff_time_sec @max_retries -= 1 retry else log_exception(e) raise e end end end |
#shutdown ⇒ Object
Shuts down the various threads created by createMessageStreams This can be called after the thread executing consume has exited to clean up.
58 59 60 |
# File 'lib/hermann/provider/java_simple_consumer.rb', line 58 def shutdown @consumer.shutdown end |