Class: Snoopka::Listener
- Inherits:
-
Object
- Object
- Snoopka::Listener
- Defined in:
- lib/snoopka/listener.rb
Instance Method Summary collapse
- #add_observer(topic = '', &proc) ⇒ Object
-
#consume ⇒ Object
loop through all consumers to read from kafka.
-
#create_consumer(topic) ⇒ Object
create a kafka consumer for the topic.
-
#initialize(settings = {}) ⇒ Listener
constructor
A new instance of Listener.
-
#notify_observers(topic, messages) ⇒ Object
loop through all observers of this topic and call the associated blocks.
- #observer_count ⇒ Object
- #observers ⇒ Object
- #settings ⇒ Object
Constructor Details
#initialize(settings = {}) ⇒ Listener
Returns a new instance of Listener.
6 7 8 9 10 |
# File 'lib/snoopka/listener.rb', line 6 def initialize(settings = {}) @settings = settings @observers = {} @consumers = [] end |
Instance Method Details
#add_observer(topic = '', &proc) ⇒ Object
24 25 26 27 28 |
# File 'lib/snoopka/listener.rb', line 24 def add_observer(topic = '', &proc) @observers[topic] ||= [] @observers[topic] << proc @consumers << create_consumer(topic) end |
#consume ⇒ Object
loop through all consumers to read from kafka
38 39 40 41 42 43 |
# File 'lib/snoopka/listener.rb', line 38 def consume @consumers.each do |consumer| = consumer.consume notify_observers(consumer.topic, ) if .length > 0 end end |
#create_consumer(topic) ⇒ Object
create a kafka consumer for the topic
46 47 48 49 50 51 52 53 54 |
# File 'lib/snoopka/listener.rb', line 46 def create_consumer(topic) Kafka::Consumer.new( { host: @settings[:host] || 'localhost', port: @settings[:port] || 9092, topic: topic } ) end |
#notify_observers(topic, messages) ⇒ Object
loop through all observers of this topic and call the associated blocks
31 32 33 34 35 |
# File 'lib/snoopka/listener.rb', line 31 def notify_observers(topic, ) @observers[topic].each do |observer| observer.call end end |
#observer_count ⇒ Object
20 21 22 |
# File 'lib/snoopka/listener.rb', line 20 def observer_count @observers.count end |
#observers ⇒ Object
16 17 18 |
# File 'lib/snoopka/listener.rb', line 16 def observers @observers end |
#settings ⇒ Object
12 13 14 |
# File 'lib/snoopka/listener.rb', line 12 def settings @settings end |