Class: AMQP::Consumer
- Inherits:
-
Object
- Object
- AMQP::Consumer
- Extended by:
- ProtocolMethodHandlers
- Includes:
- Callbacks
- Defined in:
- lib/amqp/consumer.rb
Overview
AMQP consumers are entities that handle messages delivered to them (“push API” as opposed to “pull API”) by AMQP broker. Every consumer is associated with a queue. Consumers can be exclusive (no other consumers can be registered for the same queue) or not (consumers share the queue). In the case of multiple consumers per queue, messages are distributed in round robin manner with respect to channel-level prefetch setting).
Instance Attribute Summary collapse
-
#arguments ⇒ Hash
readonly
Custom subscription metadata.
-
#channel ⇒ AMQP::Channel
readonly
Channel this consumer uses.
-
#consumer_tag ⇒ String
readonly
Consumer tag, unique consumer identifier.
-
#queue ⇒ AMQP::Queue
readonly
Queue messages are consumed from.
Acknowledging & Rejecting Messages collapse
-
#acknowledge(delivery_tag) ⇒ Consumer
Acknowledge a delivery tag.
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed when AMQP connection is recovered after a network failure..
-
#reject(delivery_tag, requeue = true) ⇒ Consumer
Self.
Error Handling & Recovery collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
Class Method Summary collapse
-
.tag_generator ⇒ AMQP::ConsumerTagGenerator
Consumer tag generator.
-
.tag_generator=(generator) ⇒ AMQP::ConsumerTagGenerator
Provided argument.
Instance Method Summary collapse
-
#cancel(nowait = false, &block) ⇒ AMQP::Consumer
Self.
-
#consume(nowait = false, &block) ⇒ AMQP::Consumer
Begin consuming messages from the queue.
-
#exclusive? ⇒ Boolean
True if this consumer is exclusive (other consumers for the same queue are not allowed).
-
#handle_cancel(basic_cancel) ⇒ Object
on_cancel(&block).
-
#handle_cancel_ok(cancel_ok) ⇒ Object
handle_consume_ok(consume_ok).
-
#handle_consume_ok(consume_ok) ⇒ Object
handle_delivery(basic_deliver, metadata, payload).
-
#handle_delivery(basic_deliver, metadata, payload) ⇒ Object
Implementation.
-
#initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) ⇒ Consumer
constructor
A new instance of Consumer.
-
#inspect ⇒ String
Readable representation of relevant object state.
- #on_cancel(&block) ⇒ Object
-
#on_delivery(&block) ⇒ AMQP::Consumer
Register a block that will be used to handle delivered messages.
- #register_with_channel ⇒ Object protected
-
#register_with_queue ⇒ Object
protected
register_with_channel.
-
#resubscribe(&block) ⇒ AMQP::Consumer
Used by automatic recovery code.
-
#subscribed? ⇒ Boolean
Queue API compatibility.
- #to_s ⇒ Object
-
#unregister_with_channel ⇒ Object
protected
register_with_queue.
-
#unregister_with_queue ⇒ Object
protected
register_with_channel.
Methods included from ProtocolMethodHandlers
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Constructor Details
#initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) ⇒ Consumer
Returns a new instance of Consumer.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/amqp/consumer.rb', line 50 def initialize(channel, queue, consumer_tag = nil, exclusive = false, no_ack = false, arguments = {}, no_local = false, &block) @callbacks = Hash.new @channel = channel || raise(ArgumentError, "channel is nil") @connection = channel.connection || raise(ArgumentError, "connection is nil") @queue = queue || raise(ArgumentError, "queue is nil") @consumer_tag = consumer_tag || self.class.tag_generator.generate_for(queue) @exclusive = exclusive @no_ack = no_ack @arguments = arguments @no_local = no_local self.register_with_channel self.register_with_queue end |
Instance Attribute Details
#arguments ⇒ Hash (readonly)
Returns Custom subscription metadata.
35 36 37 |
# File 'lib/amqp/consumer.rb', line 35 def arguments @arguments end |
#channel ⇒ AMQP::Channel (readonly)
Returns Channel this consumer uses.
29 30 31 |
# File 'lib/amqp/consumer.rb', line 29 def channel @channel end |
#consumer_tag ⇒ String (readonly)
Returns Consumer tag, unique consumer identifier.
33 34 35 |
# File 'lib/amqp/consumer.rb', line 33 def consumer_tag @consumer_tag end |
#queue ⇒ AMQP::Queue (readonly)
Returns Queue messages are consumed from.
31 32 33 |
# File 'lib/amqp/consumer.rb', line 31 def queue @queue end |
Class Method Details
.tag_generator ⇒ AMQP::ConsumerTagGenerator
Returns Consumer tag generator.
39 40 41 |
# File 'lib/amqp/consumer.rb', line 39 def self.tag_generator @tag_generator ||= AMQP::ConsumerTagGenerator.new end |
.tag_generator=(generator) ⇒ AMQP::ConsumerTagGenerator
Returns Provided argument.
45 46 47 |
# File 'lib/amqp/consumer.rb', line 45 def self.tag_generator=(generator) @tag_generator = generator end |
Instance Method Details
#acknowledge(delivery_tag) ⇒ Consumer
Acknowledge a delivery tag.
200 201 202 203 204 |
# File 'lib/amqp/consumer.rb', line 200 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
275 276 277 278 279 |
# File 'lib/amqp/consumer.rb', line 275 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.resubscribe self.exec_callback_yielding_self(:after_recovery) end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
255 256 257 |
# File 'lib/amqp/consumer.rb', line 255 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#cancel(nowait = false, &block) ⇒ AMQP::Consumer
Returns self.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/amqp/consumer.rb', line 114 def cancel(nowait = false, &block) @channel.once_open do @queue.once_declared do @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@channel.id, @consumer_tag, nowait)) if !nowait self.redefine_callback(:cancel, &block) @channel.consumers_awaiting_cancel_ok.push(self) end self end end self end |
#consume(nowait = false, &block) ⇒ AMQP::Consumer
Begin consuming messages from the queue
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/amqp/consumer.rb', line 76 def consume(nowait = false, &block) @channel.once_open do @queue.once_declared do @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, nowait, @arguments)) if !nowait self.redefine_callback(:consume, &block) @channel.consumers_awaiting_consume_ok.push(self) end self end end self end |
#exclusive? ⇒ Boolean
Returns true if this consumer is exclusive (other consumers for the same queue are not allowed).
68 69 70 |
# File 'lib/amqp/consumer.rb', line 68 def exclusive? !!@exclusive end |
#handle_cancel(basic_cancel) ⇒ Object
on_cancel(&block)
187 188 189 |
# File 'lib/amqp/consumer.rb', line 187 def handle_cancel(basic_cancel) self.exec_callback(:scancel, basic_cancel) end |
#handle_cancel_ok(cancel_ok) ⇒ Object
handle_consume_ok(consume_ok)
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/amqp/consumer.rb', line 301 def handle_cancel_ok(cancel_ok) self.exec_callback_once(:cancel, cancel_ok) self.unregister_with_channel self.unregister_with_queue @consumer_tag = nil # detach from object graph so that this object will be garbage-collected @queue = nil @channel = nil @connection = nil self.clear_callbacks(:delivery) self.clear_callbacks(:consume) self.clear_callbacks(:cancel) self.clear_callbacks(:scancel) end |
#handle_consume_ok(consume_ok) ⇒ Object
handle_delivery(basic_deliver, metadata, payload)
297 298 299 |
# File 'lib/amqp/consumer.rb', line 297 def handle_consume_ok(consume_ok) self.exec_callback_once(:consume, consume_ok) end |
#handle_delivery(basic_deliver, metadata, payload) ⇒ Object
Implementation
293 294 295 |
# File 'lib/amqp/consumer.rb', line 293 def handle_delivery(basic_deliver, , payload) self.exec_callback(:delivery, basic_deliver, , payload) end |
#inspect ⇒ String
Returns Readable representation of relevant object state.
176 177 178 |
# File 'lib/amqp/consumer.rb', line 176 def inspect "#<AMQP::Consumer:#{@consumer_tag}> queue=#{@queue.name} channel=#{@channel.id} callbacks=#{@callbacks.inspect}" end |
#on_cancel(&block) ⇒ Object
181 182 183 184 185 |
# File 'lib/amqp/consumer.rb', line 181 def on_cancel(&block) self.append_callback(:scancel, &block) self end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
239 240 241 |
# File 'lib/amqp/consumer.rb', line 239 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_delivery(&block) ⇒ AMQP::Consumer
Register a block that will be used to handle delivered messages.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/amqp/consumer.rb', line 152 def on_delivery(&block) # We have to maintain this multiple arities jazz # because older versions this gem are used in examples in at least 3 # books published by O'Reilly :(. MK. delivery_shim = Proc.new { |basic_deliver, headers, payload| case block.arity when 1 then block.call(payload) when 2 then h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload) else h = Header.new(@channel, basic_deliver, headers.decode_payload) block.call(h, payload, basic_deliver.consumer_tag, basic_deliver.delivery_tag, basic_deliver.redelivered, basic_deliver.exchange, basic_deliver.routing_key) end } self.append_callback(:delivery, &delivery_shim) self end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
225 226 227 |
# File 'lib/amqp/consumer.rb', line 225 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#register_with_channel ⇒ Object (protected)
354 355 356 |
# File 'lib/amqp/consumer.rb', line 354 def register_with_channel @channel.consumers[@consumer_tag] = self end |
#register_with_queue ⇒ Object (protected)
register_with_channel
358 359 360 |
# File 'lib/amqp/consumer.rb', line 358 def register_with_queue @queue.consumers[@consumer_tag] = self end |
#reject(delivery_tag, requeue = true) ⇒ Consumer
Returns self.
211 212 213 214 215 |
# File 'lib/amqp/consumer.rb', line 211 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end |
#resubscribe(&block) ⇒ AMQP::Consumer
Used by automatic recovery code.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/amqp/consumer.rb', line 96 def resubscribe(&block) @channel.once_open do @queue.once_declared do self.unregister_with_channel @consumer_tag = self.class.tag_generator.generate_for(@queue) self.register_with_channel @connection.send_frame(AMQ::Protocol::Basic::Consume.encode(@channel.id, @queue.name, @consumer_tag, @no_local, @no_ack, @exclusive, block.nil?, @arguments)) self.redefine_callback(:consume, &block) if block self end end self end |
#subscribed? ⇒ Boolean
Queue API compatibility.
134 135 136 |
# File 'lib/amqp/consumer.rb', line 134 def subscribed? !@callbacks[:delivery].empty? end |
#to_s ⇒ Object
284 285 286 |
# File 'lib/amqp/consumer.rb', line 284 def to_s "#<#{self.class.name} @consumer_tag=#{@consumer_tag} @queue=#{@queue.name} @channel=#{@channel.id}>" end |
#unregister_with_channel ⇒ Object (protected)
register_with_queue
362 363 364 |
# File 'lib/amqp/consumer.rb', line 362 def unregister_with_channel @channel.consumers.delete(@consumer_tag) end |
#unregister_with_queue ⇒ Object (protected)
register_with_channel
366 367 368 |
# File 'lib/amqp/consumer.rb', line 366 def unregister_with_queue @queue.consumers.delete(@consumer_tag) end |