Class: Baton::ConsumerManager
- Inherits:
-
Object
- Object
- Baton::ConsumerManager
- Includes:
- Logging
- Defined in:
- lib/baton/consumer_manager.rb
Instance Attribute Summary collapse
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#exchange_in ⇒ Object
Returns the value of attribute exchange_in.
-
#exchange_out ⇒ Object
Returns the value of attribute exchange_out.
Instance Method Summary collapse
-
#handle_message(metadata, payload) ⇒ Object
Public: Triggered whenever a message is received and forwards the message to the consumer’s handle_message.
-
#initialize(consumer, channel, exchange_in, exchange_out) ⇒ ConsumerManager
constructor
Public: Initialize a ConsumerManager and adds itself as an observer to the consumer.
-
#start ⇒ Object
Public: Creates a queue and binds it to the input exchange based on the consumer’s routing key.
-
#update(message) ⇒ Object
Public: Method that is triggered when a consumer notifies with a message.
Methods included from Logging
Constructor Details
#initialize(consumer, channel, exchange_in, exchange_out) ⇒ ConsumerManager
Public: Initialize a ConsumerManager and adds itself as an observer to the consumer.
consumer - An instance of Baton::Consumer channel - An AMQP channel exchange_in - An input exchange exchange_out - An output exchange
14 15 16 17 18 |
# File 'lib/baton/consumer_manager.rb', line 14 def initialize(consumer, channel, exchange_in, exchange_out) @consumer, @channel, @exchange_in, @exchange_out = consumer, channel, exchange_in, exchange_out @consumer.add_observer(self) @consumer.consumer_manager = self end |
Instance Attribute Details
#channel ⇒ Object
Returns the value of attribute channel.
6 7 8 |
# File 'lib/baton/consumer_manager.rb', line 6 def channel @channel end |
#consumer ⇒ Object
Returns the value of attribute consumer.
6 7 8 |
# File 'lib/baton/consumer_manager.rb', line 6 def consumer @consumer end |
#exchange_in ⇒ Object
Returns the value of attribute exchange_in.
6 7 8 |
# File 'lib/baton/consumer_manager.rb', line 6 def exchange_in @exchange_in end |
#exchange_out ⇒ Object
Returns the value of attribute exchange_out.
6 7 8 |
# File 'lib/baton/consumer_manager.rb', line 6 def exchange_out @exchange_out end |
Instance Method Details
#handle_message(metadata, payload) ⇒ Object
Public: Triggered whenever a message is received and forwards the message to the consumer’s handle_message.
metadata - A metadata structure such as OpenStruct payload - A JSON message
Examples
(, "{\"message\":\"a message\",\"type\":\"a type\"}")
Returns nothing.
42 43 44 45 |
# File 'lib/baton/consumer_manager.rb', line 42 def (, payload) logger.debug "Received #{payload}, content_type = #{.content_type}" consumer.(payload) end |
#start ⇒ Object
Public: Creates a queue and binds it to the input exchange based on the consumer’s routing key. Also adds handle_message as a callback method to queue.subscribe().
Returns nothing.
24 25 26 27 28 29 |
# File 'lib/baton/consumer_manager.rb', line 24 def start queue = channel.queue("", :exclusive => true, :auto_delete => true) queue.bind(exchange_in, :routing_key => consumer.routing_key) queue.subscribe(&method(:handle_message)) logger.info "Bind queue with routing key '#{consumer.routing_key}' to exchange '#{exchange_in.name}', waiting for messages..." end |
#update(message) ⇒ Object
Public: Method that is triggered when a consumer notifies with a message. It logs the messages and writes them to the output exchange as json.
message - A general message (Hash, String, etc)
Examples
update("A message")
Returns nothing.
57 58 59 60 61 62 63 64 65 |
# File 'lib/baton/consumer_manager.rb', line 57 def update() case .fetch(:type){""} when "error" logger.error else logger.info end exchange_out.publish(.to_json) end |