Class: DaemonObjects::Amqp::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/daemon_objects/amqp/worker.rb

Constant Summary collapse

DEFAULTS =
{
  :queue_name  => AMQ::Protocol::EMPTY_STRING,
  :exchange    => nil,
  :routing_key => AMQ::Protocol::EMPTY_STRING,
  :arguments   => nil
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, consumer, options = {}) ⇒ Worker

Returns a new instance of Worker.



11
12
13
14
15
16
# File 'lib/daemon_objects/amqp/worker.rb', line 11

def initialize(channel, consumer, options={})
  self.consumer = consumer
  self.channel  = channel

  parse_options(DEFAULTS.merge(options))
end

Instance Attribute Details

#argumentsObject

Returns the value of attribute arguments.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def arguments
  @arguments
end

#channelObject

Returns the value of attribute channel.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def channel
  @channel
end

#consumerObject

Returns the value of attribute consumer.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def consumer
  @consumer
end

#exchangeObject

Returns the value of attribute exchange.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def exchange
  @exchange
end

#loggerObject

Returns the value of attribute logger.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def logger
  @logger
end

#queue_nameObject

Returns the value of attribute queue_name.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def queue_name
  @queue_name
end

#routing_keyObject

Returns the value of attribute routing_key.



2
3
4
# File 'lib/daemon_objects/amqp/worker.rb', line 2

def routing_key
  @routing_key
end

Instance Method Details

#handle_channel_exception(channel, channel_close) ⇒ Object

Raises:

  • (StandardError)


45
46
47
# File 'lib/daemon_objects/amqp/worker.rb', line 45

def handle_channel_exception(channel, channel_close)
  raise StandardError, "ERROR channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}"
end

#handle_message(channel, delivery_tag, payload) ⇒ Object



49
50
51
52
53
54
55
56
57
# File 'lib/daemon_objects/amqp/worker.rb', line 49

def handle_message(channel, delivery_tag, payload)
  response = consumer.handle_message (payload)
  channel.acknowledge(delivery_tag, true)
  response
rescue Exception => e
  channel.reject(delivery_tag)
  logger.error "Error occurred handling message, the payload was: #{payload}, the error was: '#{e}'."
  e
end

#parse_options(options) ⇒ Object



18
19
20
21
22
# File 'lib/daemon_objects/amqp/worker.rb', line 18

def parse_options(options)
  options.each do |k,v|
    self.send("#{k}=", v) if self.respond_to?("#{k}=")
  end
end

#startObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/daemon_objects/amqp/worker.rb', line 29

def start
  queue = channel.queue(queue_name, :durable => true, :arguments => arguments)
  queue.bind(exchange, :routing_key => routing_key) if exchange

  queue.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload|
    exception = handle_message(channel, delivery_info.delivery_tag, payload)

    response_payload = consumer.get_response(payload, exception) if consumer.respond_to?(:get_response)
    if response_payload
      channel.default_exchange.publish(response_payload.to_json, 
                                       :routing_key    => properties.reply_to, 
                                       :correlation_id => properties.message_id)
    end
  end
end