Class: AMQP::Consumer
- Inherits:
-
AMQ::Client::Async::Consumer
- Object
- AMQ::Client::Async::Consumer
- AMQP::Consumer
- Defined in:
- lib/amqp/consumer.rb
Overview
AMQP consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).
Instance Attribute Summary collapse
-
#arguments ⇒ Hash
readonly
Custom subscription metadata.
-
#channel ⇒ AMQP::Channel
readonly
Channel this consumer uses.
-
#consumer_tag ⇒ String
readonly
Consumer tag, unique consumer identifier.
-
#queue ⇒ AMQP::Queue
readonly
Queue messages are consumed from.
Acknowledging & Rejecting Messages collapse
-
#acknowledge(delivery_tag) ⇒ Consumer
Acknowledge a delivery tag.
-
#reject(delivery_tag, requeue = true) ⇒ Consumer
Self.
Error Handling & Recovery collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed when AMQP connection is recovered after a network failure..
Class Method Summary collapse
-
.tag_generator ⇒ AMQ::Client::ConsumerTagGenerator
Consumer tag generator.
-
.tag_generator=(generator) ⇒ AMQ::Client::ConsumerTagGenerator
Provided argument.
Instance Method Summary collapse
-
#cancel(nowait = false, &block) ⇒ AMQP::Consumer
Self.
-
#consume(nowait = false, &block) ⇒ AMQP::Consumer
Begin consuming messages from the queue.
-
#exclusive? ⇒ Boolean
True if this consumer is exclusive (other consumers for the same queue are not allowed).
-
#initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false) ⇒ Consumer
constructor
A new instance of Consumer.
-
#inspect ⇒ String
Readable representation of relevant object state.
-
#on_delivery(&block) ⇒ AMQP::Consumer
Register a block that will be used to handle delivered messages.
-
#resubscribe(&block) ⇒ AMQP::Consumer
Used by automatic recovery code.
-
#subscribed? ⇒ Boolean
Queue API compatibility.
Constructor Details
#initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false) ⇒ Consumer
Returns a new instance of Consumer.
42 43 44 |
# File 'lib/amqp/consumer.rb', line 42 def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false) super(channel, queue, (consumer_tag || self.class.tag_generator.generate_for(queue)), exclusive, no_ack, arguments, no_local) end |
Instance Attribute Details
#arguments ⇒ Hash (readonly)
Returns Custom subscription metadata.
27 28 29 |
# File 'lib/amqp/consumer.rb', line 27 def arguments @arguments end |
#channel ⇒ AMQP::Channel (readonly)
Returns Channel this consumer uses.
21 22 23 |
# File 'lib/amqp/consumer.rb', line 21 def channel @channel end |
#consumer_tag ⇒ String (readonly)
Returns Consumer tag, unique consumer identifier.
25 26 27 |
# File 'lib/amqp/consumer.rb', line 25 def consumer_tag @consumer_tag end |
#queue ⇒ AMQP::Queue (readonly)
Returns Queue messages are consumed from.
23 24 25 |
# File 'lib/amqp/consumer.rb', line 23 def queue @queue end |
Class Method Details
.tag_generator ⇒ AMQ::Client::ConsumerTagGenerator
Returns Consumer tag generator.
31 32 33 |
# File 'lib/amqp/consumer.rb', line 31 def self.tag_generator @tag_generator ||= AMQ::Client::ConsumerTagGenerator.new end |
.tag_generator=(generator) ⇒ AMQ::Client::ConsumerTagGenerator
Returns Provided argument.
37 38 39 |
# File 'lib/amqp/consumer.rb', line 37 def self.tag_generator=(generator) @tag_generator = generator end |
Instance Method Details
#acknowledge(delivery_tag) ⇒ Consumer
Acknowledge a delivery tag.
143 144 145 |
# File 'lib/amqp/consumer.rb', line 143 def acknowledge(delivery_tag) super(delivery_tag) end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
194 195 196 |
# File 'lib/amqp/consumer.rb', line 194 def auto_recover super end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
176 177 178 |
# File 'lib/amqp/consumer.rb', line 176 def before_recovery(&block) super(&block) end |
#cancel(nowait = false, &block) ⇒ AMQP::Consumer
Returns self.
83 84 85 86 87 88 89 90 91 |
# File 'lib/amqp/consumer.rb', line 83 def cancel(nowait = false, &block) @channel.once_open do @queue.once_declared do super(nowait, &block) end end self end |
#consume(nowait = false, &block) ⇒ AMQP::Consumer
Begin consuming messages from the queue
55 56 57 58 59 60 61 62 63 |
# File 'lib/amqp/consumer.rb', line 55 def consume(nowait = false, &block) @channel.once_open do @queue.once_declared do super(nowait, &block) end end self end |
#exclusive? ⇒ Boolean
Returns true if this consumer is exclusive (other consumers for the same queue are not allowed).
47 48 49 |
# File 'lib/amqp/consumer.rb', line 47 def exclusive? super end |
#inspect ⇒ String
Returns Readable representation of relevant object state.
202 203 204 |
# File 'lib/amqp/consumer.rb', line 202 def inspect "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}" end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
165 166 167 |
# File 'lib/amqp/consumer.rb', line 165 def on_connection_interruption(&block) super(&block) end |
#on_delivery(&block) ⇒ AMQP::Consumer
Register a block that will be used to handle delivered messages.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/amqp/consumer.rb', line 115 def on_delivery(&block) # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. delivery_shim = Proc.new { |basic_deliver, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload) else h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key) end } super(&delivery_shim) end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
184 185 186 |
# File 'lib/amqp/consumer.rb', line 184 def on_recovery(&block) super(&block) end |
#reject(delivery_tag, requeue = true) ⇒ Consumer
Returns self.
152 153 154 |
# File 'lib/amqp/consumer.rb', line 152 def reject(delivery_tag, requeue = true) super(delivery_tag, requeue) end |
#resubscribe(&block) ⇒ AMQP::Consumer
Used by automatic recovery code.
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/amqp/consumer.rb', line 68 def resubscribe(&block) @channel.once_open do @queue.once_declared do self.unregister_with_channel @consumer_tag = self.class.tag_generator.generate_for(@queue) self.register_with_channel super(&block) end end self end |
#subscribed? ⇒ Boolean
Queue API compatibility.
97 98 99 |
# File 'lib/amqp/consumer.rb', line 97 def subscribed? !@callbacks[:delivery].empty? end |