Module: PipelineToolkit::Amqp::Reader
Overview
The Reader provides functionality specific to asynchronous message delivery by subscribing to the AMQP server.
Instance Attribute Summary
Attributes included from Abstract
Instance Method Summary collapse
-
#initialize_reader ⇒ Object
Initialize the AMQP server specific to handling of messages from the server.
-
#perform_acknowledgement(ack_id) ⇒ Object
Perform the acknowledgement on the stored header to tell AMQP server we are done with the message.
-
#queue_subscribe ⇒ Object
Subscribe the queue to receive messages.
-
#store_acknowledgement(message, header) ⇒ Object
Store the acknowledgement header.
Methods included from Abstract
#bind_queue, #initialize, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection
Instance Method Details
#initialize_reader ⇒ Object
Initialize the AMQP server specific to handling of messages from the server.
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 17 def initialize_reader DefaultLogger.debug("Amqp::Reader#initialize_reader") @ack_headers ||= {} initialize_connection initialize_channel # NB: Used to be 1 to prevent deadlocks. Think deadlocks won't happen anymore (because doing asyn # reads now), but keep an eye on this parameter. @channel.prefetch(25) initialize_exchange initialize_queue bind_queue() end |
#perform_acknowledgement(ack_id) ⇒ Object
Perform the acknowledgement on the stored header to tell AMQP server we are done with the message.
61 62 63 64 65 |
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 61 def perform_acknowledgement(ack_id) DefaultLogger.debug("Amqp::Reader#perform_acknowledgement(ack_id)") header = @ack_headers.delete(ack_id) header.ack end |
#queue_subscribe ⇒ Object
Subscribe the queue to receive messages
33 34 35 36 37 38 39 40 41 |
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 33 def queue_subscribe DefaultLogger.debug("Amqp::Reader#queue_subscribe") unless AMQP.closing? @queue.subscribe(:ack => [:ack]) do |header, body| DefaultLogger.debug("Rabbit Message: #{body}") (header, body) end end end |
#store_acknowledgement(message, header) ⇒ Object
Store the acknowledgement header
49 50 51 52 53 |
# File 'lib/pipeline_toolkit/amqp/reader.rb', line 49 def store_acknowledgement(, header) DefaultLogger.debug("Amqp::Reader#store_acknowledgement(message, header)") [:ack_id] = header.delivery_tag.to_s @ack_headers[[:ack_id]] = header end |