Class: MultipleMan::Listener

Inherits:
Object
  • Object
show all
Defined in:
lib/multiple_man/listener.rb

Direct Known Subclasses

SeederListener

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription) ⇒ Listener

Returns a new instance of Listener.



20
21
22
23
# File 'lib/multiple_man/listener.rb', line 20

def initialize(subscription)
  self.subscription = subscription
  self.init_connection
end

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



31
32
33
# File 'lib/multiple_man/listener.rb', line 31

def connection
  @connection
end

#subscriptionObject

Returns the value of attribute subscription.



31
32
33
# File 'lib/multiple_man/listener.rb', line 31

def subscription
  @subscription
end

Class Method Details

.startObject



8
9
10
11
12
13
14
15
# File 'lib/multiple_man/listener.rb', line 8

def start
  MultipleMan.logger.debug "Starting listeners."
  MultipleMan.logger.debug Subscribers::Registry.subscriptions.to_json

  Subscribers::Registry.subscriptions.each do |subscription|
    new(subscription).listen
  end
end

Instance Method Details

#handle_error(ex, delivery_info) ⇒ Object



53
54
55
56
57
58
59
# File 'lib/multiple_man/listener.rb', line 53

def handle_error(ex, delivery_info)
  MultipleMan.logger.error "   Error - #{ex.message}\n\n#{ex.backtrace}"
  MultipleMan.error(ex)

  # Requeue the message
  queue.channel.nack(delivery_info.delivery_tag, false, true)
end

#init_connectionObject



25
26
27
28
29
# File 'lib/multiple_man/listener.rb', line 25

def init_connection
  channel = MultipleMan::Connection.connection.create_channel(nil, MultipleMan.configuration.worker_concurrency)
  channel.prefetch(100)
  self.connection = MultipleMan::Connection.new(channel)
end

#listenObject



33
34
35
36
37
38
39
# File 'lib/multiple_man/listener.rb', line 33

def listen
  
  MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}."
  queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, , payload|
    process_message(delivery_info, payload)
  end
end

#operation(delivery_info) ⇒ Object



61
62
63
# File 'lib/multiple_man/listener.rb', line 61

def operation(delivery_info)
  delivery_info.routing_key.split(".").last
end

#process_message(delivery_info, payload) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/multiple_man/listener.rb', line 41

def process_message(delivery_info, payload)
  MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}."
  begin
    subscription.send(operation(delivery_info), JSON.parse(payload).with_indifferent_access)
  rescue Exception => ex
    handle_error(ex, delivery_info)
  else
    MultipleMan.logger.debug "   Successfully processed!"
    queue.channel.acknowledge(delivery_info.delivery_tag, false)
  end
end

#queueObject



65
66
67
# File 'lib/multiple_man/listener.rb', line 65

def queue
  connection.queue(queue_name, durable: true, auto_delete: false)
end

#routing_keyObject



69
70
71
# File 'lib/multiple_man/listener.rb', line 69

def routing_key
  subscription.routing_key
end