Class: Hermann::Consumer
- Inherits:
-
Object
- Object
- Hermann::Consumer
- Defined in:
- lib/hermann/consumer.rb
Overview
Hermann::Consumer provides a simple consumer API which is only safe to be executed in a single thread
Instance Attribute Summary collapse
-
#internal ⇒ Object
readonly
Returns the value of attribute internal.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Instance Method Summary collapse
-
#consume(topic = nil, &block) ⇒ Object
Delegates the consume method to internal consumer classes.
-
#initialize(topic, opts = {}) ⇒ Consumer
constructor
Instantiate Consumer.
-
#shutdown ⇒ Object
Delegates the shutdown of kafka messages threads to internal consumer classes.
Constructor Details
#initialize(topic, opts = {}) ⇒ Consumer
Instantiate Consumer
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/hermann/consumer.rb', line 27 def initialize(topic, opts = {}) @topic = topic offset = opts.delete(:offset) raise Hermann::Errors::InvalidOffsetError.new("Bad offset: #{offset}") unless valid_offset?(offset) if Hermann.jruby? zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id) @internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts) else brokers, partition = require_values_at(opts, :brokers, :partition) @internal = Hermann::Provider::RDKafka::Consumer.new(topic, brokers, partition, offset) end end |
Instance Attribute Details
#internal ⇒ Object (readonly)
Returns the value of attribute internal.
14 15 16 |
# File 'lib/hermann/consumer.rb', line 14 def internal @internal end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
14 15 16 |
# File 'lib/hermann/consumer.rb', line 14 def topic @topic end |
Instance Method Details
#consume(topic = nil, &block) ⇒ Object
Delegates the consume method to internal consumer classes
45 46 47 |
# File 'lib/hermann/consumer.rb', line 45 def consume(topic=nil, &block) @internal.consume(topic, &block) end |