Class: MultipleMan::Listener
- Inherits:
-
Object
- Object
- MultipleMan::Listener
- Defined in:
- lib/multiple_man/listener.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#connection ⇒ Object
Returns the value of attribute connection.
-
#subscription ⇒ Object
Returns the value of attribute subscription.
Class Method Summary collapse
Instance Method Summary collapse
- #handle_error(ex, delivery_info) ⇒ Object
- #init_connection ⇒ Object
-
#initialize(subscription) ⇒ Listener
constructor
A new instance of Listener.
- #listen ⇒ Object
- #operation(delivery_info) ⇒ Object
- #process_message(delivery_info, payload) ⇒ Object
- #queue ⇒ Object
- #routing_key ⇒ Object
Constructor Details
#initialize(subscription) ⇒ Listener
Returns a new instance of Listener.
20 21 22 23 |
# File 'lib/multiple_man/listener.rb', line 20 def initialize(subscription) self.subscription = subscription self.init_connection end |
Instance Attribute Details
#connection ⇒ Object
Returns the value of attribute connection.
31 32 33 |
# File 'lib/multiple_man/listener.rb', line 31 def connection @connection end |
#subscription ⇒ Object
Returns the value of attribute subscription.
31 32 33 |
# File 'lib/multiple_man/listener.rb', line 31 def subscription @subscription end |
Class Method Details
.start ⇒ Object
8 9 10 11 12 13 14 15 |
# File 'lib/multiple_man/listener.rb', line 8 def start MultipleMan.logger.debug "Starting listeners." MultipleMan.logger.debug Subscribers::Registry.subscriptions.to_json Subscribers::Registry.subscriptions.each do |subscription| new(subscription).listen end end |
Instance Method Details
#handle_error(ex, delivery_info) ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/multiple_man/listener.rb', line 53 def handle_error(ex, delivery_info) MultipleMan.logger.error " Error - #{ex.}\n\n#{ex.backtrace}" MultipleMan.error(ex) # Requeue the message queue.channel.nack(delivery_info.delivery_tag, false, true) end |
#init_connection ⇒ Object
25 26 27 28 29 |
# File 'lib/multiple_man/listener.rb', line 25 def init_connection channel = MultipleMan::Connection.connection.create_channel(nil, MultipleMan.configuration.worker_concurrency) channel.prefetch(100) self.connection = MultipleMan::Connection.new(channel) end |
#listen ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/multiple_man/listener.rb', line 33 def listen MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}." queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, , payload| (delivery_info, payload) end end |
#operation(delivery_info) ⇒ Object
61 62 63 |
# File 'lib/multiple_man/listener.rb', line 61 def operation(delivery_info) delivery_info.routing_key.split(".").last end |
#process_message(delivery_info, payload) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/multiple_man/listener.rb', line 41 def (delivery_info, payload) MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}." begin subscription.send(operation(delivery_info), JSON.parse(payload).with_indifferent_access) rescue Exception => ex handle_error(ex, delivery_info) else MultipleMan.logger.debug " Successfully processed!" queue.channel.acknowledge(delivery_info.delivery_tag, false) end end |
#queue ⇒ Object
65 66 67 |
# File 'lib/multiple_man/listener.rb', line 65 def queue connection.queue(queue_name, durable: true, auto_delete: false) end |
#routing_key ⇒ Object
69 70 71 |
# File 'lib/multiple_man/listener.rb', line 69 def routing_key subscription.routing_key end |