Class: AMQ::Client::Async::Queue
- Inherits:
-
Object
- Object
- AMQ::Client::Async::Queue
- Extended by:
- ProtocolMethodHandlers
- Defined in:
- lib/amq/client/async/extensions/rabbitmq/cancel.rb,
lib/amq/client/async/queue.rb
Overview
Consumer
Constant Summary
Constants included from Openable
Instance Attribute Summary collapse
-
#arguments ⇒ Hash
readonly
Additional arguments given on queue declaration.
- #bindings ⇒ Array<Hash> readonly
-
#channel ⇒ AMQ::Client::Channel
readonly
Channel this queue belongs to.
-
#consumers ⇒ Array<Hash>
readonly
All consumers on this queue.
-
#default_consumer ⇒ AMQ::Client::Consumer
readonly
Default consumer (registered with #consume).
-
#name ⇒ String
readonly
Qeueue name.
Attributes included from Entity
Attributes included from Openable
Declaration collapse
-
#declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) ⇒ Queue
Declares this queue.
-
#redeclare(&block) ⇒ Object
Re-declares queue with the same attributes.
Binding collapse
-
#bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) ⇒ Queue
Self.
-
#unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) ⇒ Queue
Self.
Consuming messages collapse
-
.consumer_class ⇒ Class
AMQ::Client::Consumer or other class implementing consumer API.
-
#cancel(nowait = false, &block) ⇒ Queue
Unsubscribes from message delivery.
-
#consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) ⇒ Queue
Self.
Working With Messages collapse
-
#get(no_ack = false, &block) ⇒ Queue
Fetches messages from the queue.
- #on_delivery(&block) ⇒ Object
-
#purge(nowait = false, &block) ⇒ Queue
Purges (removes all messagse from) the queue.
Acknowledging & Rejecting Messages collapse
-
#acknowledge(delivery_tag) ⇒ Queue
Acknowledge a delivery tag.
-
#reject(delivery_tag, requeue = true) ⇒ Queue
Self.
Error Handling & Recovery collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed when AMQP connection is recovered after a network failure..
Instance Method Summary collapse
-
#auto_delete? ⇒ Boolean
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#delete(if_unused = false, if_empty = false, nowait = false, &block) ⇒ Queue
Deletes this queue.
-
#durable? ⇒ Boolean
True if this queue was declared as durable (will survive broker restart).
-
#exclusive? ⇒ Boolean
True if this queue was declared as exclusive (limited to just one consumer).
-
#generate_consumer_tag(name) ⇒ String
Unique string supposed to be used as a consumer tag.
-
#handle_bind_ok(method) ⇒ Object
handle_purge_ok(method).
- #handle_declare_ok(method) ⇒ Object
- #handle_delete_ok(method) ⇒ Object
-
#handle_get_empty(method) ⇒ Object
handle_get_ok(method, header, payload).
-
#handle_get_ok(method, header, payload) ⇒ Object
handle_unbind_ok(method).
-
#handle_purge_ok(method) ⇒ Object
handle_delete_ok(method).
-
#handle_unbind_ok(method) ⇒ Object
handle_bind_ok(method).
-
#initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING) ⇒ Queue
constructor
A new instance of Queue.
Methods included from ProtocolMethodHandlers
Methods included from Extensions::RabbitMQ::Basic::QueueMixin
Methods included from ServerNamedEntity
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Constructor Details
#initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING) ⇒ Queue
Returns a new instance of Queue.
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/amq/client/async/queue.rb', line 54 def initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? super(connection) @name = name # this has to stay true even after queue.declare-ok arrives. MK. @server_named = @name.empty? if @server_named self.on_connection_interruption do # server-named queue need to get new names after recovery. MK. @name = AMQ::Protocol::EMPTY_STRING end end @channel = channel # primarily for autorecovery. MK. @bindings = Array.new @consumers = Hash.new end |
Instance Attribute Details
#arguments ⇒ Hash (readonly)
Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.
43 44 45 |
# File 'lib/amq/client/async/queue.rb', line 43 def arguments @arguments end |
#bindings ⇒ Array<Hash> (readonly)
46 47 48 |
# File 'lib/amq/client/async/queue.rb', line 46 def bindings @bindings end |
#channel ⇒ AMQ::Client::Channel (readonly)
Channel this queue belongs to.
34 35 36 |
# File 'lib/amq/client/async/queue.rb', line 34 def channel @channel end |
#consumers ⇒ Array<Hash> (readonly)
Returns All consumers on this queue.
37 38 39 |
# File 'lib/amq/client/async/queue.rb', line 37 def consumers @consumers end |
#default_consumer ⇒ AMQ::Client::Consumer (readonly)
Returns Default consumer (registered with #consume).
40 41 42 |
# File 'lib/amq/client/async/queue.rb', line 40 def default_consumer @default_consumer end |
#name ⇒ String (readonly)
Qeueue name. May be server-generated or assigned directly.
30 31 32 |
# File 'lib/amq/client/async/queue.rb', line 30 def name @name end |
Class Method Details
.consumer_class ⇒ Class
Returns AMQ::Client::Consumer or other class implementing consumer API. Used by libraries like Ruby amqp gem.
250 251 252 |
# File 'lib/amq/client/async/queue.rb', line 250 def self.consumer_class AMQ::Client::Async::Consumer end |
Instance Method Details
#acknowledge(delivery_tag) ⇒ Queue
Acknowledge a delivery tag.
348 349 350 351 352 |
# File 'lib/amq/client/async/queue.rb', line 348 def acknowledge(delivery_tag) @channel.acknowledge(delivery_tag) self end |
#auto_delete? ⇒ Boolean
Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
92 93 94 |
# File 'lib/amq/client/async/queue.rb', line 92 def auto_delete? @auto_delete end |
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
427 428 429 430 431 432 433 434 435 436 |
# File 'lib/amq/client/async/queue.rb', line 427 def auto_recover self.exec_callback_yielding_self(:before_recovery) self.redeclare do self.rebind @consumers.each { |tag, consumer| consumer.auto_recover } self.exec_callback_yielding_self(:after_recovery) end end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
393 394 395 |
# File 'lib/amq/client/async/queue.rb', line 393 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) ⇒ Queue
Returns self.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/amq/client/async/queue.rb', line 186 def bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) nowait = true unless block exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments)) if !nowait self.append_callback(:bind, &block) @channel.queues_awaiting_bind_ok.push(self) end # store bindings for automatic recovery, but BE VERY CAREFUL to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments } @bindings.push(binding) unless @bindings.include?(binding) self end |
#cancel(nowait = false, &block) ⇒ Queue
Unsubscribes from message delivery.
275 276 277 278 279 280 281 |
# File 'lib/amq/client/async/queue.rb', line 275 def cancel(nowait = false, &block) raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil? @default_consumer.cancel(nowait, &block) self end |
#consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) ⇒ Queue
Returns self.
260 261 262 263 264 265 266 267 268 |
# File 'lib/amq/client/async/queue.rb', line 260 def consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQ::Client::Consumer directly to register additional consumers.") if @default_consumer nowait = true unless block @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block) @default_consumer.consume(nowait, &block) self end |
#declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) ⇒ Queue
Declares this queue.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/amq/client/async/queue.rb', line 106 def declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous? # these two are for autorecovery. MK. @passive = passive @server_named = @name.empty? @durable = durable @exclusive = exclusive @auto_delete = auto_delete @arguments = arguments nowait = true if !block && !@name.empty? && nowait.nil? @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end |
#delete(if_unused = false, if_empty = false, nowait = false, &block) ⇒ Queue
Deletes this queue.
163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/amq/client/async/queue.rb', line 163 def delete(if_unused = false, if_empty = false, nowait = false, &block) nowait = true unless block @connection.send_frame(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait)) if !nowait self.append_callback(:delete, &block) # TODO: delete itself from queues cache @channel.queues_awaiting_delete_ok.push(self) end self end |
#durable? ⇒ Boolean
Returns true if this queue was declared as durable (will survive broker restart).
80 81 82 |
# File 'lib/amq/client/async/queue.rb', line 80 def durable? @durable end |
#exclusive? ⇒ Boolean
Returns true if this queue was declared as exclusive (limited to just one consumer).
86 87 88 |
# File 'lib/amq/client/async/queue.rb', line 86 def exclusive? @exclusive end |
#generate_consumer_tag(name) ⇒ String
Unique string supposed to be used as a consumer tag.
450 451 452 |
# File 'lib/amq/client/async/queue.rb', line 450 def generate_consumer_tag(name) "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}" end |
#get(no_ack = false, &block) ⇒ Queue
Fetches messages from the queue.
303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/amq/client/async/queue.rb', line 303 def get(no_ack = false, &block) @connection.send_frame(Protocol::Basic::Get.encode(@channel.id, @name, no_ack)) # most people only want one callback per #get call. Consider the following example: # # 100.times { queue.get { ... } } # # most likely you won't expect 100 callback runs per message here. MK. self.redefine_callback(:get, &block) @channel.queues_awaiting_get_response.push(self) self end |
#handle_bind_ok(method) ⇒ Object
handle_purge_ok(method)
475 476 477 |
# File 'lib/amq/client/async/queue.rb', line 475 def handle_bind_ok(method) self.exec_callback_once(:bind, method) end |
#handle_declare_ok(method) ⇒ Object
460 461 462 463 464 465 |
# File 'lib/amq/client/async/queue.rb', line 460 def handle_declare_ok(method) @name = method.queue if @name.empty? @channel.register_queue(self) self.exec_callback_once_yielding_self(:declare, method) end |
#handle_delete_ok(method) ⇒ Object
467 468 469 |
# File 'lib/amq/client/async/queue.rb', line 467 def handle_delete_ok(method) self.exec_callback_once(:delete, method) end |
#handle_get_empty(method) ⇒ Object
handle_get_ok(method, header, payload)
488 489 490 491 |
# File 'lib/amq/client/async/queue.rb', line 488 def handle_get_empty(method) method = Protocol::GetResponse.new(method) self.exec_callback(:get, method) end |
#handle_get_ok(method, header, payload) ⇒ Object
handle_unbind_ok(method)
483 484 485 486 |
# File 'lib/amq/client/async/queue.rb', line 483 def handle_get_ok(method, header, payload) method = Protocol::GetResponse.new(method) self.exec_callback(:get, method, header, payload) end |
#handle_purge_ok(method) ⇒ Object
handle_delete_ok(method)
471 472 473 |
# File 'lib/amq/client/async/queue.rb', line 471 def handle_purge_ok(method) self.exec_callback_once(:purge, method) end |
#handle_unbind_ok(method) ⇒ Object
handle_bind_ok(method)
479 480 481 |
# File 'lib/amq/client/async/queue.rb', line 479 def handle_unbind_ok(method) self.exec_callback_once(:unbind, method) end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
376 377 378 |
# File 'lib/amq/client/async/queue.rb', line 376 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_delivery(&block) ⇒ Object
293 294 295 |
# File 'lib/amq/client/async/queue.rb', line 293 def on_delivery(&block) @default_consumer.on_delivery(&block) end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
409 410 411 |
# File 'lib/amq/client/async/queue.rb', line 409 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#purge(nowait = false, &block) ⇒ Queue
Purges (removes all messagse from) the queue.
324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/amq/client/async/queue.rb', line 324 def purge(nowait = false, &block) nowait = true unless block @connection.send_frame(Protocol::Queue::Purge.encode(@channel.id, @name, nowait)) if !nowait self.redefine_callback(:purge, &block) # TODO: handle channel & connection-level exceptions @channel.queues_awaiting_purge_ok.push(self) end self end |
#redeclare(&block) ⇒ Object
Re-declares queue with the same attributes
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/amq/client/async/queue.rb', line 131 def redeclare(&block) nowait = true if !block && !@name.empty? # server-named queues get their new generated names. new_name = if @server_named AMQ::Protocol::EMPTY_STRING else @name end @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments)) if !nowait self.append_callback(:declare, &block) @channel.queues_awaiting_declare_ok.push(self) end self end |
#reject(delivery_tag, requeue = true) ⇒ Queue
Returns self.
359 360 361 362 363 |
# File 'lib/amq/client/async/queue.rb', line 359 def reject(delivery_tag, requeue = true) @channel.reject(delivery_tag, requeue) self end |
#unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) ⇒ Queue
Returns self.
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/amq/client/async/queue.rb', line 215 def unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @connection.send_frame(Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments)) self.append_callback(:unbind, &block) @channel.queues_awaiting_unbind_ok.push(self) @bindings.delete_if { |b| b[:exchange] == exchange_name } self end |