Class: Nagare::ListenerPool

Inherits:
Object
  • Object
show all
Defined in:
lib/nagare/listener_pool.rb

Overview

ListenerPool acts both as a registry of all listeners in the application and as the polling mechanism that retrieves messages from redis using consumer groups and deivers them to registered listenersone at a time.

Class Method Summary collapse

Class Method Details

.listener_poolHash

A registry of listeners in the format { stream: [listeners…]}

Returns:

  • (Hash)

    listeners



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/nagare/listener_pool.rb', line 14

def listener_pool
  listeners.each_with_object({}) do |listener, hash|
    stream = listener.stream_name

    unless hash.key?(listener.stream_name)
      logger.debug "Assigned stream #{stream} - listener #{listener.name}"
      create_and_subscribe_to_stream(stream)
      hash[stream] = []
    end
    hash[stream] << listener
    hash
  end
end

.listenersObject



28
29
30
31
32
# File 'lib/nagare/listener_pool.rb', line 28

def listeners
  ObjectSpace.each_object(Class).select do |klass|
    klass < Nagare::Listener
  end
end

.pollObject

Polls redis for new messages on all registered streams and delivers messages to the registered listeners. If the listener does not raise any errors, automatically ACKs the message to the redis consumer group.



53
54
55
56
57
# File 'lib/nagare/listener_pool.rb', line 53

def poll
  listener_pool.each do |stream, listeners|
    poll_stream(stream, listeners)
  end
end

.start_listeningThread

Initiates polling of redis and distribution of messages to listeners in a thread

Returns:

  • (Thread)

    the listening thread



39
40
41
42
43
44
45
46
47
# File 'lib/nagare/listener_pool.rb', line 39

def start_listening
  logger.info 'Starting Nagare thread'
  Thread.new do
    loop do
      poll
      sleep 1
    end
  end
end