Class: Baton::ConsumerManager

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/baton/consumer_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, logger, logger=

Constructor Details

#initialize(consumer, channel, exchange_in, exchange_out) ⇒ ConsumerManager

Public: Initialize a ConsumerManager and adds itself as an observer to the consumer.

consumer - An instance of Baton::Consumer channel - An AMQP channel exchange_in - An input exchange exchange_out - An output exchange



14
15
16
17
18
# File 'lib/baton/consumer_manager.rb', line 14

def initialize(consumer, channel, exchange_in, exchange_out)
  @consumer, @channel, @exchange_in, @exchange_out = consumer, channel, exchange_in, exchange_out
  @consumer.add_observer(self)
  @consumer.consumer_manager = self
end

Instance Attribute Details

#channelObject

Returns the value of attribute channel.



6
7
8
# File 'lib/baton/consumer_manager.rb', line 6

def channel
  @channel
end

#consumerObject

Returns the value of attribute consumer.



6
7
8
# File 'lib/baton/consumer_manager.rb', line 6

def consumer
  @consumer
end

#exchange_inObject

Returns the value of attribute exchange_in.



6
7
8
# File 'lib/baton/consumer_manager.rb', line 6

def exchange_in
  @exchange_in
end

#exchange_outObject

Returns the value of attribute exchange_out.



6
7
8
# File 'lib/baton/consumer_manager.rb', line 6

def exchange_out
  @exchange_out
end

Instance Method Details

#handle_message(metadata, payload) ⇒ Object

Public: Triggered whenever a message is received and forwards the message to the consumer’s handle_message.

metadata - A metadata structure such as OpenStruct payload - A JSON message

Examples

handle_message(, "{\"message\":\"a message\",\"type\":\"a type\"}")

Returns nothing.



42
43
44
45
# File 'lib/baton/consumer_manager.rb', line 42

def handle_message(, payload)
  logger.debug "Received #{payload}, content_type = #{.content_type}"
  consumer.handle_message(payload)
end

#startObject

Public: Creates a queue and binds it to the input exchange based on the consumer’s routing key. Also adds handle_message as a callback method to queue.subscribe().

Returns nothing.



24
25
26
27
28
29
# File 'lib/baton/consumer_manager.rb', line 24

def start
  queue = channel.queue("", :exclusive => true, :auto_delete => true)
  queue.bind(exchange_in, :routing_key => consumer.routing_key)
  queue.subscribe(&method(:handle_message))
  logger.info "Bind queue with routing key '#{consumer.routing_key}' to exchange '#{exchange_in.name}', waiting for messages..."
end

#update(message) ⇒ Object

Public: Method that is triggered when a consumer notifies with a message. It logs the messages and writes them to the output exchange as json.

message - A general message (Hash, String, etc)

Examples

update("A message")

Returns nothing.



57
58
59
60
61
62
63
64
65
# File 'lib/baton/consumer_manager.rb', line 57

def update(message)
  case message.fetch(:type){""}
  when "error"
    logger.error message
  else
    logger.info message
  end
  exchange_out.publish(message.to_json)
end