Class: AMQ::Client::Async::Queue

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers
Includes:
Entity, Extensions::RabbitMQ::Basic::QueueMixin, ServerNamedEntity
Defined in:
lib/amq/client/async/extensions/rabbitmq/cancel.rb,
lib/amq/client/async/queue.rb

Overview

Consumer

Constant Summary

Constants included from Openable

Openable::VALUES

Instance Attribute Summary collapse

Attributes included from Entity

#callbacks

Attributes included from Openable

#status

Declaration collapse

Binding collapse

Consuming messages collapse

Working With Messages collapse

Acknowledging & Rejecting Messages collapse

Error Handling & Recovery collapse

Instance Method Summary collapse

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from Extensions::RabbitMQ::Basic::QueueMixin

#on_cancel

Methods included from ServerNamedEntity

#server_named?

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • AMQ (AMQ::Client::Adapter)

    networking adapter to use.

  • AMQ (AMQ::Client::Channel)

    channel this queue object uses.

  • Queue (String)

    name. Please note that AMQP spec does not require brokers to support Unicode for queue names.

Raises:

  • (ArgumentError)


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/amq/client/async/queue.rb', line 54

def initialize(connection, channel, name = AMQ::Protocol::EMPTY_STRING)
  raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?

  super(connection)

  @name         = name
  # this has to stay true even after queue.declare-ok arrives. MK.
  @server_named = @name.empty?
  if @server_named
    self.on_connection_interruption do
      # server-named queue need to get new names after recovery. MK.
      @name = AMQ::Protocol::EMPTY_STRING
    end
  end

  @channel      = channel

  # primarily for autorecovery. MK.
  @bindings  = Array.new

  @consumers = Hash.new
end

Instance Attribute Details

#argumentsHash (readonly)

Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.

Returns:

  • (Hash)

    Additional arguments given on queue declaration. Typically used by AMQP extensions.



43
44
45
# File 'lib/amq/client/async/queue.rb', line 43

def arguments
  @arguments
end

#bindingsArray<Hash> (readonly)

Returns:

  • (Array<Hash>)


46
47
48
# File 'lib/amq/client/async/queue.rb', line 46

def bindings
  @bindings
end

#channelAMQ::Client::Channel (readonly)

Channel this queue belongs to.

Returns:

  • (AMQ::Client::Channel)


34
35
36
# File 'lib/amq/client/async/queue.rb', line 34

def channel
  @channel
end

#consumersArray<Hash> (readonly)

Returns All consumers on this queue.

Returns:

  • (Array<Hash>)

    All consumers on this queue.



37
38
39
# File 'lib/amq/client/async/queue.rb', line 37

def consumers
  @consumers
end

#default_consumerAMQ::Client::Consumer (readonly)

Returns Default consumer (registered with #consume).

Returns:

  • (AMQ::Client::Consumer)

    Default consumer (registered with #consume).



40
41
42
# File 'lib/amq/client/async/queue.rb', line 40

def default_consumer
  @default_consumer
end

#nameString (readonly)

Qeueue name. May be server-generated or assigned directly.

Returns:

  • (String)


30
31
32
# File 'lib/amq/client/async/queue.rb', line 30

def name
  @name
end

Class Method Details

.consumer_classClass

Returns AMQ::Client::Consumer or other class implementing consumer API. Used by libraries like Ruby amqp gem.

Returns:

  • (Class)

    AMQ::Client::Consumer or other class implementing consumer API. Used by libraries like Ruby amqp gem.



250
251
252
# File 'lib/amq/client/async/queue.rb', line 250

def self.consumer_class
  AMQ::Client::Async::Consumer
end

Instance Method Details

#acknowledge(delivery_tag) ⇒ Queue

Acknowledge a delivery tag.



348
349
350
351
352
# File 'lib/amq/client/async/queue.rb', line 348

def acknowledge(delivery_tag)
  @channel.acknowledge(delivery_tag)

  self
end

#auto_delete?Boolean

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).



92
93
94
# File 'lib/amq/client/async/queue.rb', line 92

def auto_delete?
  @auto_delete
end

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



427
428
429
430
431
432
433
434
435
436
# File 'lib/amq/client/async/queue.rb', line 427

def auto_recover
  self.exec_callback_yielding_self(:before_recovery)
  self.redeclare do
    self.rebind

    @consumers.each { |tag, consumer| consumer.auto_recover }

    self.exec_callback_yielding_self(:after_recovery)
  end
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



393
394
395
# File 'lib/amq/client/async/queue.rb', line 393

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block) ⇒ Queue

Returns self.



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/amq/client/async/queue.rb', line 186

def bind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, nowait = false, arguments = nil, &block)
  nowait = true unless block
  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else

                    exchange
                  end

  @connection.send_frame(Protocol::Queue::Bind.encode(@channel.id, @name, exchange_name, routing_key, nowait, arguments))

  if !nowait
    self.append_callback(:bind, &block)
    @channel.queues_awaiting_bind_ok.push(self)
  end

  # store bindings for automatic recovery, but BE VERY CAREFUL to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => routing_key, :arguments => arguments }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end

#cancel(nowait = false, &block) ⇒ Queue

Unsubscribes from message delivery.



275
276
277
278
279
280
281
# File 'lib/amq/client/async/queue.rb', line 275

def cancel(nowait = false, &block)
  raise "There is no default consumer for this queue. This usually means that you are trying to unsubscribe a queue that never was subscribed for messages in the first place." if @default_consumer.nil?

  @default_consumer.cancel(nowait, &block)

  self
end

#consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block) ⇒ Queue

Returns self.

Returns:

Raises:

  • (RuntimeError)

See Also:



260
261
262
263
264
265
266
267
268
# File 'lib/amq/client/async/queue.rb', line 260

def consume(no_ack = false, exclusive = false, nowait = false, no_local = false, arguments = nil, &block)
  raise RuntimeError.new("This queue already has default consumer. Please instantiate AMQ::Client::Consumer directly to register additional consumers.") if @default_consumer

  nowait            = true unless block
  @default_consumer = self.class.consumer_class.new(@channel, self, generate_consumer_tag(@name), exclusive, no_ack, arguments, no_local, &block)
  @default_consumer.consume(nowait, &block)

  self
end

#declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block) ⇒ Queue

Declares this queue.

Returns:

Raises:

  • (ArgumentError)

See Also:



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/amq/client/async/queue.rb', line 106

def declare(passive = false, durable = false, exclusive = false, auto_delete = false, nowait = false, arguments = nil, &block)
  raise ArgumentError, "declaration with nowait does not make sense for server-named queues! Either specify name other than empty string or use #declare without nowait" if nowait && self.anonymous?

  # these two are for autorecovery. MK.
  @passive      = passive
  @server_named = @name.empty?

  @durable     = durable
  @exclusive   = exclusive
  @auto_delete = auto_delete
  @arguments   = arguments

  nowait = true if !block && !@name.empty? && nowait.nil?
  @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, @name, passive, durable, exclusive, auto_delete, nowait, arguments))

  if !nowait
    self.append_callback(:declare, &block)
    @channel.queues_awaiting_declare_ok.push(self)
  end

  self
end

#delete(if_unused = false, if_empty = false, nowait = false, &block) ⇒ Queue

Deletes this queue.

Parameters:

  • if_unused (Boolean) (defaults to: false)

    delete only if queue has no consumers (subscribers).

  • if_empty (Boolean) (defaults to: false)

    delete only if queue has no messages in it.

  • nowait (Boolean) (defaults to: false)

    Don’t wait for reply from broker.

Returns:

See Also:



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/amq/client/async/queue.rb', line 163

def delete(if_unused = false, if_empty = false, nowait = false, &block)
  nowait = true unless block
  @connection.send_frame(Protocol::Queue::Delete.encode(@channel.id, @name, if_unused, if_empty, nowait))

  if !nowait
    self.append_callback(:delete, &block)

    # TODO: delete itself from queues cache
    @channel.queues_awaiting_delete_ok.push(self)
  end

  self
end

#durable?Boolean

Returns true if this queue was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this queue was declared as durable (will survive broker restart).



80
81
82
# File 'lib/amq/client/async/queue.rb', line 80

def durable?
  @durable
end

#exclusive?Boolean

Returns true if this queue was declared as exclusive (limited to just one consumer).

Returns:

  • (Boolean)

    true if this queue was declared as exclusive (limited to just one consumer)



86
87
88
# File 'lib/amq/client/async/queue.rb', line 86

def exclusive?
  @exclusive
end

#generate_consumer_tag(name) ⇒ String

Unique string supposed to be used as a consumer tag.

Returns:

  • (String)

    Unique string.



450
451
452
# File 'lib/amq/client/async/queue.rb', line 450

def generate_consumer_tag(name)
  "#{name}-#{Time.now.to_i * 1000}-#{Kernel.rand(999_999_999_999)}"
end

#get(no_ack = false, &block) ⇒ Queue

Fetches messages from the queue.



303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/amq/client/async/queue.rb', line 303

def get(no_ack = false, &block)
  @connection.send_frame(Protocol::Basic::Get.encode(@channel.id, @name, no_ack))

  # most people only want one callback per #get call. Consider the following example:
  #
  # 100.times { queue.get { ... } }
  #
  # most likely you won't expect 100 callback runs per message here. MK.
  self.redefine_callback(:get, &block)
  @channel.queues_awaiting_get_response.push(self)

  self
end

#handle_bind_ok(method) ⇒ Object

handle_purge_ok(method)



475
476
477
# File 'lib/amq/client/async/queue.rb', line 475

def handle_bind_ok(method)
  self.exec_callback_once(:bind, method)
end

#handle_declare_ok(method) ⇒ Object



460
461
462
463
464
465
# File 'lib/amq/client/async/queue.rb', line 460

def handle_declare_ok(method)
  @name = method.queue if @name.empty?
  @channel.register_queue(self)

  self.exec_callback_once_yielding_self(:declare, method)
end

#handle_delete_ok(method) ⇒ Object



467
468
469
# File 'lib/amq/client/async/queue.rb', line 467

def handle_delete_ok(method)
  self.exec_callback_once(:delete, method)
end

#handle_get_empty(method) ⇒ Object

handle_get_ok(method, header, payload)



488
489
490
491
# File 'lib/amq/client/async/queue.rb', line 488

def handle_get_empty(method)
  method = Protocol::GetResponse.new(method)
  self.exec_callback(:get, method)
end

#handle_get_ok(method, header, payload) ⇒ Object

handle_unbind_ok(method)



483
484
485
486
# File 'lib/amq/client/async/queue.rb', line 483

def handle_get_ok(method, header, payload)
  method = Protocol::GetResponse.new(method)
  self.exec_callback(:get, method, header, payload)
end

#handle_purge_ok(method) ⇒ Object

handle_delete_ok(method)



471
472
473
# File 'lib/amq/client/async/queue.rb', line 471

def handle_purge_ok(method)
  self.exec_callback_once(:purge, method)
end

#handle_unbind_ok(method) ⇒ Object

handle_bind_ok(method)



479
480
481
# File 'lib/amq/client/async/queue.rb', line 479

def handle_unbind_ok(method)
  self.exec_callback_once(:unbind, method)
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



376
377
378
# File 'lib/amq/client/async/queue.rb', line 376

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_delivery(&block) ⇒ Object



293
294
295
# File 'lib/amq/client/async/queue.rb', line 293

def on_delivery(&block)
  @default_consumer.on_delivery(&block)
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



409
410
411
# File 'lib/amq/client/async/queue.rb', line 409

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#purge(nowait = false, &block) ⇒ Queue

Purges (removes all messagse from) the queue.



324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/amq/client/async/queue.rb', line 324

def purge(nowait = false, &block)
  nowait = true unless block
  @connection.send_frame(Protocol::Queue::Purge.encode(@channel.id, @name, nowait))

  if !nowait
    self.redefine_callback(:purge, &block)
    # TODO: handle channel & connection-level exceptions
    @channel.queues_awaiting_purge_ok.push(self)
  end

  self
end

#redeclare(&block) ⇒ Object

Re-declares queue with the same attributes



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/amq/client/async/queue.rb', line 131

def redeclare(&block)
  nowait = true if !block && !@name.empty?

  # server-named queues get their new generated names.
  new_name = if @server_named
               AMQ::Protocol::EMPTY_STRING
             else
               @name
             end
  @connection.send_frame(Protocol::Queue::Declare.encode(@channel.id, new_name, @passive, @durable, @exclusive, @auto_delete, false, @arguments))

  if !nowait
    self.append_callback(:declare, &block)
    @channel.queues_awaiting_declare_ok.push(self)
  end

  self
end

#reject(delivery_tag, requeue = true) ⇒ Queue

Returns self.



359
360
361
362
363
# File 'lib/amq/client/async/queue.rb', line 359

def reject(delivery_tag, requeue = true)
  @channel.reject(delivery_tag, requeue)

  self
end

#unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block) ⇒ Queue

Returns self.



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/amq/client/async/queue.rb', line 215

def unbind(exchange, routing_key = AMQ::Protocol::EMPTY_STRING, arguments = nil, &block)
  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else

                    exchange
                  end

  @connection.send_frame(Protocol::Queue::Unbind.encode(@channel.id, @name, exchange_name, routing_key, arguments))

  self.append_callback(:unbind, &block)
  @channel.queues_awaiting_unbind_ok.push(self)


  @bindings.delete_if { |b| b[:exchange] == exchange_name }

  self
end