Class: GorgonBunny::Queue

Inherits:
Object
  • Object
show all
Includes:
Compatibility
Defined in:
lib/gorgon_bunny/lib/gorgon_bunny/queue.rb

Overview

Represents AMQP 0.9.1 queue.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Compatibility

#channel_from

Constructor Details

#initialize(channel_or_connection, name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • channel_or_connection (GorgonBunny::Channel)

    Channel this queue will use. Session instances are supported only for backwards compatibility with 0.8.

  • name (String) (defaults to: GorgonAMQ::Protocol::EMPTY_STRING)

    Queue name. Pass an empty string to make RabbitMQ generate a unique one.

  • opts (Hash) (defaults to: {})

    Queue properties

Options Hash (opts):

  • :durable (Boolean) — default: false

    Should this queue be durable?

  • :auto_delete (Boolean) — default: false

    Should this queue be automatically deleted when the last consumer disconnects?

  • :exclusive (Boolean) — default: false

    Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?

  • :arguments (Boolean) — default: {}

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)

See Also:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 38

def initialize(channel_or_connection, name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {})
  # old GorgonBunny versions pass a connection here. In that case,
  # we just use default channel from it. MK.
  @channel          = channel_from(channel_or_connection)
  @name             = name
  @options          = self.class.add_default_options(name, opts)
  @consumers        = Hash.new

  @durable          = @options[:durable]
  @exclusive        = @options[:exclusive]
  @server_named     = @name.empty?
  @auto_delete      = @options[:auto_delete]
  @arguments        = @options[:arguments]

  @bindings         = Array.new

  @default_consumer = nil

  declare! unless opts[:no_declare]

  @channel.register_queue(self)
end

Instance Attribute Details

#channelGorgonBunny::Channel (readonly)

Returns Channel this queue uses.

Returns:



18
19
20
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 18

def channel
  @channel
end

#nameString (readonly)

Returns Queue name.

Returns:

  • (String)

    Queue name



20
21
22
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 20

def name
  @name
end

#optionsHash (readonly)

Returns Options this queue was created with.

Returns:

  • (Hash)

    Options this queue was created with



22
23
24
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 22

def options
  @options
end

Instance Method Details

#argumentsHash

Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).

Returns:

  • (Hash)

    Additional optional arguments (typically used by RabbitMQ extensions and plugins)



91
92
93
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 91

def arguments
  @arguments
end

#auto_delete?Boolean

Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).

See Also:



78
79
80
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 78

def auto_delete?
  @auto_delete
end

#bind(exchange, opts = {}) ⇒ Object

Binds queue to an exchange

Parameters:

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 106

def bind(exchange, opts = {})
  @channel.queue_bind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  # store bindings for automatic recovery. We need to be very careful to
  # not cause an infinite rebinding loop here when we recover. MK.
  binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] }
  @bindings.push(binding) unless @bindings.include?(binding)

  self
end

#consumer_countInteger

Returns How many active consumers the queue has.

Returns:

  • (Integer)

    How many active consumers the queue has



311
312
313
314
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 311

def consumer_count
  s = self.status
  s[:consumer_count]
end

#declare!Object



357
358
359
360
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 357

def declare!
  queue_declare_ok = @channel.queue_declare(@name, @options)
  @name = queue_declare_ok.queue
end

#delete(opts = {}) ⇒ Object

Deletes the queue

Parameters:

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • if_unused (Boolean) — default: false

    Should this queue be deleted only if it has no consumers?

  • if_empty (Boolean) — default: false

    Should this queue be deleted only if it has no messages?

See Also:



281
282
283
284
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 281

def delete(opts = {})
  @channel.deregister_queue(self)
  @channel.queue_delete(@name, opts)
end

#durable?Boolean

Returns true if this queue was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this queue was declared as durable (will survive broker restart).

See Also:



64
65
66
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 64

def durable?
  @durable
end

#exclusive?Boolean

Returns true if this queue was declared as exclusive (limited to just one consumer).

Returns:

  • (Boolean)

    true if this queue was declared as exclusive (limited to just one consumer)

See Also:



71
72
73
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 71

def exclusive?
  @exclusive
end

#message_countInteger

Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).

Returns:

  • (Integer)

    How many messages the queue has ready (e.g. not delivered but not unacknowledged)



305
306
307
308
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 305

def message_count
  s = self.status
  s[:message_count]
end

#pop(opts = {:ack => false}, &block) ⇒ Array Also known as: get

Returns Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

Examples:

conn = GorgonBunny.new
conn.start

ch   = conn.create_channel
q = ch.queue("test1")
x = ch.default_exchange
x.publish("Hello, everybody!", :routing_key => 'test1')

delivery_info, properties, payload = q.pop

puts "This is the message: " + payload + "\n\n"
conn.close

Parameters:

  • opts (Hash) (defaults to: {:ack => false})

    Options

Options Hash (opts):

  • :ack (Boolean) — default: false

    Will the message be acknowledged manually?

Returns:

  • (Array)

    Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.

See Also:



231
232
233
234
235
236
237
238
239
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 231

def pop(opts = {:ack => false}, &block)
  delivery_info, properties, content = @channel.basic_get(@name, opts)

  if block
    block.call(delivery_info, properties, content)
  else
    [delivery_info, properties, content]
  end
end

#pop_as_hash(opts = {:ack => false}, &block) ⇒ Hash

Deprecated.

Version of #pop that returns data in legacy format (as a hash).

Returns:



246
247
248
249
250
251
252
253
254
255
256
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 246

def pop_as_hash(opts = {:ack => false}, &block)
  delivery_info, properties, content = @channel.basic_get(@name, opts)

  result = {:header => properties, :payload => content, :delivery_details => delivery_info}

  if block
    block.call(result)
  else
    result
  end
end

#publish(payload, opts = {}) ⇒ Object

Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish



265
266
267
268
269
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 265

def publish(payload, opts = {})
  @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name))

  self
end

#purge(opts = {}) ⇒ Object

Purges a queue (removes all messages from it)



289
290
291
292
293
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 289

def purge(opts = {})
  @channel.queue_purge(@name, opts)

  self
end

#recover_bindingsObject



343
344
345
346
347
348
349
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 343

def recover_bindings
  @bindings.each do |b|
    # TODO: inject and use logger
    # puts "Recovering binding #{b.inspect}"
    self.bind(b[:exchange], b)
  end
end

#recover_from_network_failureObject



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 321

def recover_from_network_failure
  if self.server_named?
    old_name = @name.dup
    @name    = GorgonAMQ::Protocol::EMPTY_STRING

    @channel.deregister_queue_named(old_name)
  end

  # TODO: inject and use logger
  # puts "Recovering queue #{@name}"
  begin
    declare!

    @channel.register_queue(self)
  rescue Exception => e
    # TODO: inject and use logger
    puts "Caught #{e.inspect} while redeclaring and registering #{@name}!"
  end
  recover_bindings
end

#server_named?Boolean

Returns true if this queue was declared as server named.

Returns:

  • (Boolean)

    true if this queue was declared as server named.

See Also:



85
86
87
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 85

def server_named?
  @server_named
end

#statusHash

Returns A hash with information about the number of queue messages and consumers.

Returns:

  • (Hash)

    A hash with information about the number of queue messages and consumers

See Also:



298
299
300
301
302
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 298

def status
  queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true))
  {:message_count => queue_declare_ok.message_count,
    :consumer_count => queue_declare_ok.consumer_count}
end

#subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) ⇒ Object

Adds a consumer to the queue (subscribes for message deliveries).

Parameters:

  • opts (Hash) (defaults to: { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil })

    Options

Options Hash (opts):

  • :manual_ack (Boolean) — default: false

    Will this consumer use manual acknowledgements?

  • :exclusive (Boolean) — default: false

    Should this consumer be exclusive for this queue?

  • :block (Boolean) — default: false

    Should the call block calling thread?

  • :on_cancellation (#call)

    Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)

  • :consumer_tag (String)

    Unique consumer identifier. It is usually recommended to let GorgonBunny generate it for you.

  • :arguments (Hash) — default: {}

    Additional (optional) arguments, typically used by RabbitMQ extensions

See Also:



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 163

def subscribe(opts = {
                :consumer_tag    => @channel.generate_consumer_tag,
                :ack             => false,
                :exclusive       => false,
                :block           => false,
                :on_cancellation => nil
              }, &block)

  ctag       = opts.fetch(:consumer_tag, @channel.generate_consumer_tag)
  consumer   = Consumer.new(@channel,
                            self,
                            ctag,
                            !(opts[:ack] || opts[:manual_ack]),
                            opts[:exclusive],
                            opts[:arguments])

  consumer.on_delivery(&block)
  consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation]

  @channel.basic_consume_with(consumer)
  if opts[:block]
    # joins current thread with the consumers pool, will block
    # the current thread for as long as the consumer pool is active
    @channel.work_pool.join
  end

  consumer
end

#subscribe_with(consumer, opts = {:block => false}) ⇒ Object

Adds a consumer object to the queue (subscribes for message deliveries).

Parameters:

Options Hash (opts):

  • block (Boolean) — default: false

    Should the call block calling thread?

See Also:



201
202
203
204
205
206
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 201

def subscribe_with(consumer, opts = {:block => false})
  @channel.basic_consume_with(consumer)

  @channel.work_pool.join if opts[:block]
  consumer
end

#unbind(exchange, opts = {}) ⇒ Object

Unbinds queue from an exchange

Parameters:

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :arguments (Hash) — default: {}

    Additional optional binding arguments

See Also:



135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 135

def unbind(exchange, opts = {})
  @channel.queue_unbind(@name, exchange, opts)

  exchange_name = if exchange.respond_to?(:name)
                    exchange.name
                  else
                    exchange
                  end


  @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] }

  self
end