Class: Snoopka::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/snoopka/listener.rb

Instance Method Summary collapse

Constructor Details

#initialize(settings = {}) ⇒ Listener

Returns a new instance of Listener.



6
7
8
9
10
# File 'lib/snoopka/listener.rb', line 6

def initialize(settings = {})
  @settings = settings
  @observers = {}
  @consumers = []
end

Instance Method Details

#add_observer(topic = '', &proc) ⇒ Object



24
25
26
27
28
# File 'lib/snoopka/listener.rb', line 24

def add_observer(topic = '', &proc)
  @observers[topic] ||= []
  @observers[topic] << proc
  @consumers << create_consumer(topic)
end

#consumeObject

loop through all consumers to read from kafka



38
39
40
41
42
43
# File 'lib/snoopka/listener.rb', line 38

def consume
  @consumers.each do |consumer|
    messages = consumer.consume
    notify_observers(consumer.topic, messages) if messages.length > 0
  end
end

#create_consumer(topic) ⇒ Object

create a kafka consumer for the topic



46
47
48
49
50
51
52
53
54
# File 'lib/snoopka/listener.rb', line 46

def create_consumer(topic)
  Kafka::Consumer.new(
    {
      host: @settings[:host] || 'localhost',
      port: @settings[:port] || 9092,
      topic: topic
    }
  )
end

#notify_observers(topic, messages) ⇒ Object

loop through all observers of this topic and call the associated blocks



31
32
33
34
35
# File 'lib/snoopka/listener.rb', line 31

def notify_observers(topic, messages)
  @observers[topic].each do |observer|
    observer.call messages
  end
end

#observer_countObject



20
21
22
# File 'lib/snoopka/listener.rb', line 20

def observer_count
  @observers.count
end

#observersObject



16
17
18
# File 'lib/snoopka/listener.rb', line 16

def observers
  @observers
end

#settingsObject



12
13
14
# File 'lib/snoopka/listener.rb', line 12

def settings
  @settings
end