Class: MQ::Queue

Inherits:
Object
  • Object
show all
Includes:
AMQP
Defined in:
lib/mq/queue.rb

Constant Summary

Constants included from AMQP

AMQP::FIELDS, AMQP::HEADER, AMQP::PORT, AMQP::RESPONSES, AMQP::VERSION, AMQP::VERSION_FILE, AMQP::VERSION_MAJOR, AMQP::VERSION_MINOR

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AMQP

client, client=, connect, fork, settings, start, stop

Constructor Details

#initialize(mq, name, opts = {}) ⇒ Queue

Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.

Like an Exchange, queue names starting with ‘amq.’ are reserved for internal use. Attempts to create queue names in violation of this reservation will raise MQ:Error (ACCESS_REFUSED).

When a queue is created without a name, the server will generate a unique name internally (not currently supported in this library).

Options

  • :passive => true | false (default false)

If set, the server will not create the exchange if it does not already exist. The client can use this to check whether an exchange exists without modifying the server state.

  • :durable => true | false (default false)

If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue (though it is allowed).

If the queue has already been declared, any redeclaration will ignore this setting. A queue may only be declared durable the first time when it is created.

  • :exclusive => true | false (default false)

Exclusive queues may only be consumed from by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’. Only a single consumer is allowed to remove messages from this queue.

The default is a shared queue. Multiple clients may consume messages from this queue.

Attempting to redeclare an already-declared queue as :exclusive => true will raise MQ:Error.

  • :auto_delete = true | false (default false)

If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted.

The server waits for a short period of time before determining the queue is unused to give time to the client code to bind an exchange to it.

If the queue has been previously declared, this option is ignored on subsequent declarations.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



64
65
66
67
68
69
70
71
72
73
# File 'lib/mq/queue.rb', line 64

def initialize mq, name, opts = {}
  @mq = mq
  @opts = opts
  @bindings ||= {}
  @mq.queues[@name = name] ||= self
  @mq.callback{
    @mq.send Protocol::Queue::Declare.new({ :queue => name,
                                            :nowait => true }.merge(opts))
  }
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



74
75
76
# File 'lib/mq/queue.rb', line 74

def name
  @name
end

Instance Method Details

#bind(exchange, opts = {}) ⇒ Object

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.

A valid exchange name (or reference) must be passed as the first parameter. Both of these are valid:

exch = MQ.direct('foo exchange')
queue = MQ.queue('bar queue')
queue.bind('foo.exchange') # OR
queue.bind(exch)

It is not valid to call #bind without the exchange parameter.

It is unnecessary to call #bind when the exchange name and queue name match exactly (for direct and fanout exchanges only). There is an implicit bind which will deliver the messages from the exchange to the queue.

Options

  • :key => ‘some string’

Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/mq/queue.rb', line 109

def bind exchange, opts = {}
  exchange = exchange.respond_to?(:name) ? exchange.name : exchange
  @bindings[exchange] = opts.clone

  @mq.callback{
    @mq.send Protocol::Queue::Bind.new({ :queue => name,
                                         :exchange => exchange,
                                         :routing_key => opts[:key],
                                         :nowait => true }.merge(opts))
  }
  self
end

#cancelledObject



423
424
425
426
427
428
429
# File 'lib/mq/queue.rb', line 423

def cancelled
  @on_cancel.call if @on_cancel
  @on_cancel = @on_msg = nil
  @mq.consumers.delete @consumer_tag
  @mq.queues.delete(@name) if @opts[:auto_delete]
  @consumer_tag = nil
end

#confirm_subscribeObject



418
419
420
421
# File 'lib/mq/queue.rb', line 418

def confirm_subscribe
  @on_confirm_subscribe.call if @on_confirm_subscribe
  @on_confirm_subscribe = nil
end

#delete(opts = {}) ⇒ Object

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

Options

  • :if_unused => true | false (default false)

If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.

  • :if_empty => true | false (default false)

If set, the server will only delete the queue if it has no messages. If the queue is not empty the server raises a channel exception.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



169
170
171
172
173
174
175
176
# File 'lib/mq/queue.rb', line 169

def delete opts = {}
  @mq.callback{
    @mq.send Protocol::Queue::Delete.new({ :queue => name,
                                           :nowait => true }.merge(opts))
  }
  @mq.queues.delete @name
  nil
end

#pop(opts = {}, &blk) ⇒ Object

This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.

The provided block is passed a single message each time pop is called.

EM.run do
  exchange = MQ.direct("foo queue")
  EM.add_periodic_timer(1) do
    exchange.publish("random number #{rand(1000)}")
  end

  # note that #bind is never called; it is implicit because
  # the exchange and queue names match
  queue = MQ.queue('foo queue')
  queue.pop { |body| puts "received payload [#{body}]" }

  EM.add_periodic_timer(1) { queue.pop }
end

If the block takes 2 parameters, both the header and the body will be passed in for processing. The header object is defined by AMQP::Protocol::Header.

EM.run do
  exchange = MQ.direct("foo queue")
  EM.add_periodic_timer(1) do
    exchange.publish("random number #{rand(1000)}")
  end

  queue = MQ.queue('foo queue')
  queue.pop do |header, body| 
    p header
    puts "received payload [#{body}]"
  end

  EM.add_periodic_timer(1) { queue.pop }
end

Options

  • :ack => true | false (default false)

If this field is set to false the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/mq/queue.rb', line 242

def pop opts = {}, &blk
  if blk
    @on_pop = blk
    @on_pop_opts = opts
  end

  @mq.callback{
    @mq.get_queue{ |q|
      q.push(self)
      @mq.send Protocol::Basic::Get.new({ :queue => name,
                                          :consumer_tag => name,
                                          :no_ack => !opts[:ack],
                                          :nowait => true }.merge(opts))
    }
  }

  self
end

#publish(data, opts = {}) ⇒ Object



363
364
365
# File 'lib/mq/queue.rb', line 363

def publish data, opts = {}
  exchange.publish(data, opts)
end

#purge(opts = {}) ⇒ Object

Purge all messages from the queue.



180
181
182
183
184
185
186
# File 'lib/mq/queue.rb', line 180

def purge opts = {}
  @mq.callback{
    @mq.send Protocol::Queue::Purge.new({ :queue => name,
                                          :nowait => true }.merge(opts))
  }
  nil
end

#receive(headers, body) ⇒ Object

Passes the message to the block passed to pop or subscribe.

Performs an arity check on the block’s parameters. If arity == 1, pass only the message body. If arity != 1, pass the headers and the body to the block.

See AMQP::Protocol::Header for the hash properties available from the headers parameter. See #pop or #subscribe for a code example.



387
388
389
390
391
392
393
# File 'lib/mq/queue.rb', line 387

def receive headers, body
  headers = MQ::Header.new(@mq, headers) unless headers.nil?

  if cb = (@on_msg || @on_pop)
    cb.call *(cb.arity == 1 ? [body] : [headers, body])
  end
end

#receive_status(declare_ok) ⇒ Object



410
411
412
413
414
415
416
# File 'lib/mq/queue.rb', line 410

def receive_status declare_ok
  if @on_status
    m, c = declare_ok.message_count, declare_ok.consumer_count
    @on_status.call *(@on_status.arity == 1 ? [m] : [m, c])
    @on_status = nil
  end
end

#resetObject



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/mq/queue.rb', line 431

def reset
  @deferred_status = nil
  initialize @mq, @name, @opts

  binds = @bindings
  @bindings = {}
  binds.each{|ex,opts| bind(ex, opts) }

  if blk = @on_msg
    @on_msg = nil
    subscribe @on_msg_opts, &blk
  end

  if @on_pop
    pop @on_pop_opts, &@on_pop
  end
end

#status(opts = {}, &blk) ⇒ Object

Get the number of messages and consumers on a queue.

MQ.queue('name').status{ |num_messages, num_consumers|
 puts num_messages
}


401
402
403
404
405
406
407
408
# File 'lib/mq/queue.rb', line 401

def status opts = {}, &blk
  @on_status = blk
  @mq.callback{
    @mq.send Protocol::Queue::Declare.new({ :queue => name,
                                            :passive => true }.merge(opts))
  }
  self
end

#subscribe(opts = {}, &blk) ⇒ Object

Subscribes to asynchronous message delivery.

The provided block is passed a single message each time the exchange matches a message to this queue.

EM.run do
  exchange = MQ.direct("foo queue")
  EM.add_periodic_timer(1) do
    exchange.publish("random number #{rand(1000)}")
  end

  queue = MQ.queue('foo queue')
  queue.subscribe { |body| puts "received payload [#{body}]" }
end

If the block takes 2 parameters, both the header and the body will be passed in for processing. The header object is defined by AMQP::Protocol::Header.

EM.run do
  exchange = MQ.direct("foo queue")
  EM.add_periodic_timer(1) do
    exchange.publish("random number #{rand(1000)}")
  end

  # note that #bind is never called; it is implicit because
  # the exchange and queue names match
  queue = MQ.queue('foo queue')
  queue.subscribe do |header, body| 
    p header
    puts "received payload [#{body}]"
  end
end

Options

  • :ack => true | false (default false)

If this field is set to false the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

  • :confirm => proc (default nil)

If set, this proc will be called when the server confirms subscription to the queue with a ConsumeOk message. Setting this option will automatically set :nowait => false. This is required for the server to send a confirmation.

Raises:



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/mq/queue.rb', line 315

def subscribe opts = {}, &blk
  @consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"
  @mq.consumers[@consumer_tag] = self

  raise Error, 'already subscribed to the queue' if subscribed?

  @on_msg = blk
  @on_msg_opts = opts
  opts[:nowait] = false if (@on_confirm_subscribe = opts[:confirm])

  @mq.callback{
    @mq.send Protocol::Basic::Consume.new({ :queue => name,
                                            :consumer_tag => @consumer_tag,
                                            :no_ack => !opts[:ack],
                                            :nowait => true }.merge(opts))
  }
  self
end

#subscribed?Boolean

Boolean check to see if the current queue has already been subscribed to an exchange.

Attempts to #subscribe multiple times to any exchange will raise an Exception. Only a single block at a time can be associated with any one queue for processing incoming messages.

Returns:

  • (Boolean)


374
375
376
# File 'lib/mq/queue.rb', line 374

def subscribed?
  !!@on_msg
end

#unbind(exchange, opts = {}) ⇒ Object

Remove the binding between the queue and exchange. The queue will not receive any more messages until it is bound to another exchange.

Due to the asynchronous nature of the protocol, it is possible for “in flight” messages to be received after this call completes. Those messages will be serviced by the last block used in a #subscribe or #pop call.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/mq/queue.rb', line 136

def unbind exchange, opts = {}
  exchange = exchange.respond_to?(:name) ? exchange.name : exchange
  @bindings.delete exchange

  @mq.callback{
    @mq.send Protocol::Queue::Unbind.new({ :queue => name,
                                           :exchange => exchange,
                                           :routing_key => opts[:key],
                                           :nowait => true }.merge(opts))
  }
  self
end

#unsubscribe(opts = {}, &blk) ⇒ Object

Removes the subscription from the queue and cancels the consumer. New messages will not be received by the queue. This call is similar in result to calling #unbind.

Due to the asynchronous nature of the protocol, it is possible for “in flight” messages to be received after this call completes. Those messages will be serviced by the last block used in a #subscribe or #pop call.

Additionally, if the queue was created with autodelete set to true, the server will delete the queue after its wait period has expired unless the queue is bound to an active exchange.

The method accepts a block which will be executed when the unsubscription request is acknowledged as complete by the server.

  • :nowait => true | false (default true)

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.



355
356
357
358
359
360
361
# File 'lib/mq/queue.rb', line 355

def unsubscribe opts = {}, &blk
  @on_cancel = blk
  @mq.callback{
    @mq.send Protocol::Basic::Cancel.new({ :consumer_tag => @consumer_tag }.merge(opts))
  }
  self
end