Class: Bunny::Queue09
- Inherits:
-
Qrack::Queue
- Object
- Qrack::Queue
- Bunny::Queue09
- Defined in:
- lib/bunny/queue09.rb
Overview
Queues store and forward messages. Queues can be configured in the server or created at runtime. Queues must be attached to at least one exchange in order to receive messages from publishers.
Instance Attribute Summary
Attributes inherited from Qrack::Queue
Instance Method Summary collapse
-
#ack(opts = {}) ⇒ Object
Acknowledges one or more messages delivered via the Deliver or Get_-_Ok methods.
-
#bind(exchange, opts = {}) ⇒ Symbol
Binds a queue to an exchange.
-
#default_consumer ⇒ Bunny::Consumer
Default consumer associated with this queue (if any), or nil.
-
#delete(opts = {}) ⇒ Symbol
Requests that a queue is deleted from broker/server.
-
#initialize(client, name, opts = {}) ⇒ Queue09
constructor
A new instance of Queue09.
-
#pop(opts = {}, &blk) ⇒ Hash
Gets a message from a queue in a synchronous way.
-
#purge(opts = {}) ⇒ Symbol
Removes all messages from a queue.
-
#status ⇒ Hash
Hash with keys
:message_count
andconsumer_count
. - #subscribe(opts = {}, &blk) ⇒ Object
-
#unbind(exchange, opts = {}) ⇒ Symbol
Removes a queue binding from an exchange.
-
#unsubscribe(opts = {}) ⇒ Symbol
Cancels a consumer.
Methods inherited from Qrack::Queue
#consumer_count, #message_count, #publish
Constructor Details
#initialize(client, name, opts = {}) ⇒ Queue09
Returns a new instance of Queue09.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/bunny/queue09.rb', line 8 def initialize(client, name, opts = {}) # check connection to server raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected @client = client @opts = opts @delivery_tag = nil # Queues without a given name are named by the server and are generally # bound to the process that created them. if !name opts = { :passive => false, :durable => false, :exclusive => true, :auto_delete => true, :deprecated_ticket => 0 }.merge(opts) end # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) opts = { :queue => name || '', :nowait => false, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Queue::Declare.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Queue::DeclareOk, "Error declaring queue #{name}") @name = method.queue client.queues[@name] = self end |
Instance Method Details
#ack(opts = {}) ⇒ Object
Acknowledges one or more messages delivered via the Deliver or Get_-_Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/bunny/queue09.rb', line 71 def ack(opts = {}) # Set delivery tag if delivery_tag.nil? and opts[:delivery_tag].nil? raise Bunny::AcknowledgementError, "No delivery tag received" else self.delivery_tag = opts[:delivery_tag] if delivery_tag.nil? end opts = {:delivery_tag => delivery_tag, :multiple => false}.merge(opts) client.send_frame(Qrack::Protocol09::Basic::Ack.new(opts)) # reset delivery tag self.delivery_tag = nil end |
#bind(exchange, opts = {}) ⇒ Symbol
Binds a queue to an exchange. Until a queue is bound it won’t receive any messages. Queues are bound to the direct exchange ’’ by default. If error occurs, a ProtocolError is raised.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/bunny/queue09.rb', line 99 def bind(exchange, opts = {}) exchange = exchange.respond_to?(:name) ? exchange.name : exchange # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) opts = { :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => false, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Queue::Bind.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Queue::BindOk, "Error binding queue: #{name} to exchange: #{exchange}") # return message :bind_ok end |
#default_consumer ⇒ Bunny::Consumer
Default consumer is the one registered with the convenience Bunny::Queue#subscribe method. It has no special properties of any kind.
Returns Default consumer associated with this queue (if any), or nil.
49 50 51 |
# File 'lib/bunny/queue09.rb', line 49 def default_consumer @default_consumer end |
#delete(opts = {}) ⇒ Symbol
Requests that a queue is deleted from broker/server. When a queue is deleted any pending messages
are sent to a dead-letter queue if this is defined in the server configuration. Removes reference
from queues if successful. If an error occurs raises Bunny::ProtocolError
.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/bunny/queue09.rb', line 140 def delete(opts = {}) # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) opts = { :queue => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Queue::Delete.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Queue::DeleteOk, "Error deleting queue #{name}") client.queues.delete(name) # return confirmation :delete_ok end |
#pop(opts = {}, &blk) ⇒ Hash
Gets a message from a queue in a synchronous way. If error occurs, raises Bunny_::_ProtocolError.
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/bunny/queue09.rb', line 168 def pop(opts = {}, &blk) opts = { :queue => name, :consumer_tag => name, :no_ack => !opts[:ack], :nowait => true, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Basic::Get.new(opts)) method = client.next_method if method.is_a?(Qrack::Protocol09::Basic::GetEmpty) then queue_empty = true elsif !method.is_a?(Qrack::Protocol09::Basic::GetOk) raise Bunny::ProtocolError, "Error getting message from queue #{name}" end if !queue_empty # get delivery tag to use for acknowledge self.delivery_tag = method.delivery_tag if opts[:ack] header = client.next_payload # If maximum frame size is smaller than message payload body then message # will have a message header and several message bodies msg = '' while msg.length < header.size msg << client.next_payload end msg_hash = {:header => header, :payload => msg, :delivery_details => method.arguments} else msg_hash = {:header => nil, :payload => :queue_empty, :delivery_details => nil} end # Pass message hash to block or return message hash blk ? blk.call(msg_hash) : msg_hash end |
#purge(opts = {}) ⇒ Symbol
Removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal “undo” mechanism. If an error occurs raises ProtocolError.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/bunny/queue09.rb', line 217 def purge(opts = {}) # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) opts = { :queue => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Queue::Purge.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Queue::PurgeOk, "Error purging queue #{name}") # return confirmation :purge_ok end |
#status ⇒ Hash
Returns Hash with keys :message_count
and consumer_count
.
235 236 237 238 239 240 241 |
# File 'lib/bunny/queue09.rb', line 235 def status opts = { :queue => name, :passive => true, :deprecated_ticket => 0 } client.send_frame(Qrack::Protocol09::Queue::Declare.new(opts)) method = client.next_method {:message_count => method., :consumer_count => method.consumer_count} end |
#subscribe(opts = {}, &blk) ⇒ Object
243 244 245 246 247 248 249 |
# File 'lib/bunny/queue09.rb', line 243 def subscribe(opts = {}, &blk) raise RuntimeError.new("This queue already has default consumer. Please instantiate Bunny::Consumer directly and call its #consume method to register additional consumers.") if @default_consumer && ! opts[:consumer_tag] # Create a subscription. @default_consumer = self.class.consumer_class.new(client, self, opts) @default_consumer.consume(&blk) end |
#unbind(exchange, opts = {}) ⇒ Symbol
Removes a queue binding from an exchange. If error occurs, a Bunny_::_ProtocolError is raised.
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/bunny/queue09.rb', line 260 def unbind(exchange, opts = {}) exchange = exchange.respond_to?(:name) ? exchange.name : exchange # ignore the :nowait option if passed, otherwise program will hang waiting for a # response that will not be sent by the server opts.delete(:nowait) opts = { :queue => name, :exchange => exchange, :routing_key => opts.delete(:key), :nowait => false, :deprecated_ticket => 0 }.merge(opts) client.send_frame(Qrack::Protocol09::Queue::Unbind.new(opts)) method = client.next_method client.check_response(method, Qrack::Protocol09::Queue::UnbindOk, "Error unbinding queue #{name}") # return message :unbind_ok end |
#unsubscribe(opts = {}) ⇒ Symbol
Cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/bunny/queue09.rb', line 295 def unsubscribe(opts = {}) # Default consumer_tag from subscription if not passed in consumer_tag = @default_consumer ? @default_consumer.consumer_tag : opts[:consumer_tag] # Must have consumer tag to tell server what to unsubscribe raise Bunny::UnsubscribeError, "No consumer tag received" if !consumer_tag # Cancel consumer client.send_frame(Qrack::Protocol09::Basic::Cancel.new(:consumer_tag => consumer_tag, :nowait => false)) method = client.next_method client.check_response(method, Qrack::Protocol09::Basic::CancelOk, "Error unsubscribing from queue #{name}") # Reset subscription @default_consumer = nil # Return confirmation :unsubscribe_ok end |