Class: GorgonBunny::Queue
- Inherits:
-
Object
- Object
- GorgonBunny::Queue
- Includes:
- Compatibility
- Defined in:
- lib/gorgon_bunny/lib/gorgon_bunny/queue.rb
Overview
Represents AMQP 0.9.1 queue.
Instance Attribute Summary collapse
-
#channel ⇒ GorgonBunny::Channel
readonly
Channel this queue uses.
-
#name ⇒ String
readonly
Queue name.
-
#options ⇒ Hash
readonly
Options this queue was created with.
Instance Method Summary collapse
-
#arguments ⇒ Hash
Additional optional arguments (typically used by RabbitMQ extensions and plugins).
-
#auto_delete? ⇒ Boolean
True if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
-
#bind(exchange, opts = {}) ⇒ Object
Binds queue to an exchange.
-
#consumer_count ⇒ Integer
How many active consumers the queue has.
- #declare! ⇒ Object
-
#delete(opts = {}) ⇒ Object
Deletes the queue.
-
#durable? ⇒ Boolean
True if this queue was declared as durable (will survive broker restart).
-
#exclusive? ⇒ Boolean
True if this queue was declared as exclusive (limited to just one consumer).
-
#initialize(channel_or_connection, name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ Queue
constructor
A new instance of Queue.
-
#message_count ⇒ Integer
How many messages the queue has ready (e.g. not delivered but not unacknowledged).
-
#pop(opts = {:ack => false}, &block) ⇒ Array
(also: #get)
Triple of delivery info, message properties and message content.
- #pop_as_hash(opts = {:ack => false}, &block) ⇒ Hash deprecated Deprecated.
-
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange.
-
#purge(opts = {}) ⇒ Object
Purges a queue (removes all messages from it).
- #recover_bindings ⇒ Object
- #recover_from_network_failure ⇒ Object
-
#server_named? ⇒ Boolean
True if this queue was declared as server named.
-
#status ⇒ Hash
A hash with information about the number of queue messages and consumers.
-
#subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
-
#subscribe_with(consumer, opts = {:block => false}) ⇒ Object
Adds a consumer object to the queue (subscribes for message deliveries).
-
#unbind(exchange, opts = {}) ⇒ Object
Unbinds queue from an exchange.
Methods included from Compatibility
Constructor Details
#initialize(channel_or_connection, name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {}) ⇒ Queue
Returns a new instance of Queue.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 38 def initialize(channel_or_connection, name = GorgonAMQ::Protocol::EMPTY_STRING, opts = {}) # old GorgonBunny versions pass a connection here. In that case, # we just use default channel from it. MK. @channel = channel_from(channel_or_connection) @name = name @options = self.class.(name, opts) @consumers = Hash.new @durable = @options[:durable] @exclusive = @options[:exclusive] @server_named = @name.empty? @auto_delete = @options[:auto_delete] @arguments = @options[:arguments] @bindings = Array.new @default_consumer = nil declare! unless opts[:no_declare] @channel.register_queue(self) end |
Instance Attribute Details
#channel ⇒ GorgonBunny::Channel (readonly)
Returns Channel this queue uses.
18 19 20 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 18 def channel @channel end |
#name ⇒ String (readonly)
Returns Queue name.
20 21 22 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 20 def name @name end |
#options ⇒ Hash (readonly)
Returns Options this queue was created with.
22 23 24 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 22 def @options end |
Instance Method Details
#arguments ⇒ Hash
Returns Additional optional arguments (typically used by RabbitMQ extensions and plugins).
91 92 93 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 91 def arguments @arguments end |
#auto_delete? ⇒ Boolean
Returns true if this queue was declared as automatically deleted (deleted as soon as last consumer unbinds).
78 79 80 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 78 def auto_delete? @auto_delete end |
#bind(exchange, opts = {}) ⇒ Object
Binds queue to an exchange
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 106 def bind(exchange, opts = {}) @channel.queue_bind(@name, exchange, opts) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end # store bindings for automatic recovery. We need to be very careful to # not cause an infinite rebinding loop here when we recover. MK. binding = { :exchange => exchange_name, :routing_key => (opts[:routing_key] || opts[:key]), :arguments => opts[:arguments] } @bindings.push(binding) unless @bindings.include?(binding) self end |
#consumer_count ⇒ Integer
Returns How many active consumers the queue has.
311 312 313 314 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 311 def consumer_count s = self.status s[:consumer_count] end |
#declare! ⇒ Object
357 358 359 360 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 357 def declare! queue_declare_ok = @channel.queue_declare(@name, @options) @name = queue_declare_ok.queue end |
#delete(opts = {}) ⇒ Object
Deletes the queue
281 282 283 284 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 281 def delete(opts = {}) @channel.deregister_queue(self) @channel.queue_delete(@name, opts) end |
#durable? ⇒ Boolean
Returns true if this queue was declared as durable (will survive broker restart).
64 65 66 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 64 def durable? @durable end |
#exclusive? ⇒ Boolean
Returns true if this queue was declared as exclusive (limited to just one consumer).
71 72 73 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 71 def exclusive? @exclusive end |
#message_count ⇒ Integer
Returns How many messages the queue has ready (e.g. not delivered but not unacknowledged).
305 306 307 308 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 305 def s = self.status s[:message_count] end |
#pop(opts = {:ack => false}, &block) ⇒ Array Also known as: get
Returns Triple of delivery info, message properties and message content. If the queue is empty, all three will be nils.
231 232 233 234 235 236 237 238 239 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 231 def pop(opts = {:ack => false}, &block) delivery_info, properties, content = @channel.basic_get(@name, opts) if block block.call(delivery_info, properties, content) else [delivery_info, properties, content] end end |
#pop_as_hash(opts = {:ack => false}, &block) ⇒ Hash
Version of #pop that returns data in legacy format (as a hash).
246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 246 def pop_as_hash(opts = {:ack => false}, &block) delivery_info, properties, content = @channel.basic_get(@name, opts) result = {:header => properties, :payload => content, :delivery_details => delivery_info} if block block.call(result) else result end end |
#publish(payload, opts = {}) ⇒ Object
Publishes a message to the queue via default exchange. Takes the same arguments as Exchange#publish
265 266 267 268 269 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 265 def publish(payload, opts = {}) @channel.default_exchange.publish(payload, opts.merge(:routing_key => @name)) self end |
#purge(opts = {}) ⇒ Object
Purges a queue (removes all messages from it)
289 290 291 292 293 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 289 def purge(opts = {}) @channel.queue_purge(@name, opts) self end |
#recover_bindings ⇒ Object
343 344 345 346 347 348 349 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 343 def recover_bindings @bindings.each do |b| # TODO: inject and use logger # puts "Recovering binding #{b.inspect}" self.bind(b[:exchange], b) end end |
#recover_from_network_failure ⇒ Object
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 321 def recover_from_network_failure if self.server_named? old_name = @name.dup @name = GorgonAMQ::Protocol::EMPTY_STRING @channel.deregister_queue_named(old_name) end # TODO: inject and use logger # puts "Recovering queue #{@name}" begin declare! @channel.register_queue(self) rescue Exception => e # TODO: inject and use logger puts "Caught #{e.inspect} while redeclaring and registering #{@name}!" end recover_bindings end |
#server_named? ⇒ Boolean
Returns true if this queue was declared as server named.
85 86 87 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 85 def server_named? @server_named end |
#status ⇒ Hash
Returns A hash with information about the number of queue messages and consumers.
298 299 300 301 302 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 298 def status queue_declare_ok = @channel.queue_declare(@name, @options.merge(:passive => true)) {:message_count => queue_declare_ok., :consumer_count => queue_declare_ok.consumer_count} end |
#subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) ⇒ Object
Adds a consumer to the queue (subscribes for message deliveries).
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 163 def subscribe(opts = { :consumer_tag => @channel.generate_consumer_tag, :ack => false, :exclusive => false, :block => false, :on_cancellation => nil }, &block) ctag = opts.fetch(:consumer_tag, @channel.generate_consumer_tag) consumer = Consumer.new(@channel, self, ctag, !(opts[:ack] || opts[:manual_ack]), opts[:exclusive], opts[:arguments]) consumer.on_delivery(&block) consumer.on_cancellation(&opts[:on_cancellation]) if opts[:on_cancellation] @channel.basic_consume_with(consumer) if opts[:block] # joins current thread with the consumers pool, will block # the current thread for as long as the consumer pool is active @channel.work_pool.join end consumer end |
#subscribe_with(consumer, opts = {:block => false}) ⇒ Object
Adds a consumer object to the queue (subscribes for message deliveries).
201 202 203 204 205 206 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 201 def subscribe_with(consumer, opts = {:block => false}) @channel.basic_consume_with(consumer) @channel.work_pool.join if opts[:block] consumer end |
#unbind(exchange, opts = {}) ⇒ Object
Unbinds queue from an exchange
135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/queue.rb', line 135 def unbind(exchange, opts = {}) @channel.queue_unbind(@name, exchange, opts) exchange_name = if exchange.respond_to?(:name) exchange.name else exchange end @bindings.delete_if { |b| b[:exchange] == exchange_name && b[:routing_key] == (opts[:routing_key] || opts[:key]) && b[:arguments] == opts[:arguments] } self end |