Class: DaemonObjects::Amqp::Worker
- Inherits:
-
Object
- Object
- DaemonObjects::Amqp::Worker
- Defined in:
- lib/daemon_objects/amqp/worker.rb
Constant Summary collapse
- DEFAULTS =
{ :queue_name => AMQ::Protocol::EMPTY_STRING, :exchange => nil, :routing_key => AMQ::Protocol::EMPTY_STRING, :arguments => nil }
Instance Attribute Summary collapse
-
#arguments ⇒ Object
Returns the value of attribute arguments.
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#consumer ⇒ Object
Returns the value of attribute consumer.
-
#exchange ⇒ Object
Returns the value of attribute exchange.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
-
#routing_key ⇒ Object
Returns the value of attribute routing_key.
Instance Method Summary collapse
- #handle_channel_exception(channel, channel_close) ⇒ Object
- #handle_message(channel, delivery_tag, payload) ⇒ Object
-
#initialize(channel, consumer, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #parse_options(options) ⇒ Object
- #start ⇒ Object
Constructor Details
Instance Attribute Details
#arguments ⇒ Object
Returns the value of attribute arguments.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def arguments @arguments end |
#channel ⇒ Object
Returns the value of attribute channel.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def channel @channel end |
#consumer ⇒ Object
Returns the value of attribute consumer.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def consumer @consumer end |
#exchange ⇒ Object
Returns the value of attribute exchange.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def exchange @exchange end |
#logger ⇒ Object
Returns the value of attribute logger.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def logger @logger end |
#queue_name ⇒ Object
Returns the value of attribute queue_name.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def queue_name @queue_name end |
#routing_key ⇒ Object
Returns the value of attribute routing_key.
2 3 4 |
# File 'lib/daemon_objects/amqp/worker.rb', line 2 def routing_key @routing_key end |
Instance Method Details
#handle_channel_exception(channel, channel_close) ⇒ Object
45 46 47 |
# File 'lib/daemon_objects/amqp/worker.rb', line 45 def handle_channel_exception(channel, channel_close) raise StandardError, "ERROR channel-level exception: code = #{channel_close.reply_code}, message = #{channel_close.reply_text}" end |
#handle_message(channel, delivery_tag, payload) ⇒ Object
49 50 51 52 53 54 55 56 57 |
# File 'lib/daemon_objects/amqp/worker.rb', line 49 def (channel, delivery_tag, payload) response = consumer. (payload) channel.acknowledge(delivery_tag, true) response rescue Exception => e channel.reject(delivery_tag) logger.error "Error occurred handling message, the payload was: #{payload}, the error was: '#{e}'." e end |
#parse_options(options) ⇒ Object
18 19 20 21 22 |
# File 'lib/daemon_objects/amqp/worker.rb', line 18 def () .each do |k,v| self.send("#{k}=", v) if self.respond_to?("#{k}=") end end |
#start ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/daemon_objects/amqp/worker.rb', line 29 def start queue = channel.queue(queue_name, :durable => true, :arguments => arguments) queue.bind(exchange, :routing_key => routing_key) if exchange queue.subscribe(:block => true, :ack => true) do |delivery_info, properties, payload| exception = (channel, delivery_info.delivery_tag, payload) response_payload = consumer.get_response(payload, exception) if consumer.respond_to?(:get_response) if response_payload channel.default_exchange.publish(response_payload.to_json, :routing_key => properties.reply_to, :correlation_id => properties.) end end end |