Class: ReliableMsg::Queue
Overview
Queue client API
Use the Queue object to put messages in queues, or get messages from queues.
You can create a Queue object that connects to a single queue by passing the queue name to the initialized. You can also access other queues by specifying the destination queue when putting a message.
For example:
queue = Queue.new 'my-queue'
# Put a message in the queue with priority 2, expiring in 30 seconds.
msg = 'lorem ipsum'
mid = queue.put msg, :priority=>2, :expires=>30
# Retrieve and process a message from the queue.
queue.get do |msg|
if msg.id == mid
print "Retrieved same message"
end
print "Message text: #{msg.object}"
end
See Queue.get and Queue.put for more examples.
Constant Summary collapse
- DEFAULT_MAX_DELIVERIES =
Default number of delivery attempts.
5
- INIT_OPTIONS =
[:expires, :delivery, :priority, :max_deliveries, :drb_uri, :tx_timeout, :connect_count]
- @@headers_cache =
Caches queue headers locally. Used by queues that retrieve a list of headers for their selectors, and can be shared by queue/selector objects operating on the same queue.
{}
Constants inherited from Client
Client::DEFAULT_CONNECT_RETRY, Client::DEFAULT_DRB_URI, Client::DEFAULT_TX_TIMEOUT, Client::DLQ, Client::DRB_PORT, Client::ERROR_INVALID_CONNECT_COUNT, Client::ERROR_INVALID_INIT_OPTION, Client::ERROR_INVALID_SELECTOR, Client::ERROR_INVALID_TX_TIMEOUT, Client::ERROR_SELECTOR_VALUE_OR_BLOCK, Client::THREAD_CURRENT_TX
Instance Method Summary collapse
-
#get(selector = nil, &block) ⇒ Object
Get a message from the queue.
-
#initialize(queue = nil, options = nil) ⇒ Queue
constructor
The optional argument
queue
specifies the queue name. -
#name ⇒ Object
Returns the queue name.
-
#put(message, headers = nil) ⇒ Object
Put a message in the queue.
Methods inherited from Client
#connect_count, #connect_count=, selector, #selector, #tx_timeout, #tx_timeout=
Constructor Details
#initialize(queue = nil, options = nil) ⇒ Queue
The optional argument queue
specifies the queue name. The application can still put messages in other queues by specifying the destination queue name in the header.
The following options can be passed to the initializer:
-
:expires
– Message expiration in seconds. Default for new messages. -
:delivery
– The message delivery mode. Default for new messages. -
:priority
– The message priority. Default for new messages. -
:max_deliveries
– Maximum number of attempts to deliver message. Default for new messages. -
:drb_uri
– DRb URI for connecting to the queue manager. Only required when using a remote queue manager, or different port. -
:tx_timeout
– Transaction timeout. See tx_timeout. -
:connect_count
– Connection attempts. See connect_count.
:call-seq:
Queue.new([name [,options]]) -> queue
72 73 74 75 76 77 78 |
# File 'lib/reliable-msg/queue.rb', line 72 def initialize queue = nil, = nil .each do |name, value| raise RuntimeError, format(ERROR_INVALID_OPTION, name) unless INIT_OPTIONS.include?(name) instance_variable_set "@#{name.to_s}".to_sym, value end if @queue = queue end |
Instance Method Details
#get(selector = nil, &block) ⇒ Object
Get a message from the queue.
Call with no arguments to retrieve the next message in the queue. Call with a message identifier to retrieve that message. Call with selectors to retrieve the first message that matches.
Selectors specify which headers to match. For example, to retrieve all messages in the with priority 2:
msg = queue.get :priority=>2
To put and get the same message:
mid = queue.put obj
msg = queue.get mid # or queue.get :id=>mid
assert(msg.obj == obj)
More complex selector expressions can be generated using Queue.selector. For example, to retrieve the next message with priority 2 or higher, created in the last 60 seconds:
selector = Queue.selector { priority >= 2 && created > now - 60 }
msg = queue.get selector
The following headers have special meaning:
-
:id
– The message identifier. -
:queue
– Select a message originally delivered to the named queue. Only used when retrieving messages from the dead-letter queue. -
:redelivery
– Specifies the re-delivery count for this message. Nil if the message is delivered (get) for the first time, one on the first attempt to re-deliver, and incremented once for each subsequent attempt. -
:created
– Indicates timestamp (in seconds) when the message was created. -
:expires_at
– Indicates timestamp (in seconds) when the message will expire,nil
if the message does not expire.
Call this method without a block to return the message. The returned object is of type Message, or nil
if no message is found.
Call this method in a block to retrieve and process the message. The block is called with the Message object, returning the result of the block. Returns nil
if no message is found.
All operations performed on the queue inside the block are part of the same transaction. The transaction commits when the block completes. However, if the block raises an exception, the transaction aborts: the message along with any message retrieved through that Queue object are returned to the queue; messages put through that Queue object are discarded. You cannot put and get the same message inside a transaction.
For example:
queue.put obj
while queue.get do |msg| # called for each message in the queue,
# until the queue is empty
... do something with msg ...
queue.put obj # puts another message in the queue
true
end
This loop will only complete if it raises an exception, since it gets one message from the queue and puts another message in its place. After an exception, there will be at least one message in the queue.
Each attempt to process a message increases its retry count. When the retry count (:retry
) reaches the maximum allowed (:max_retry
), the message is moved to the dead-letter queue.
This method does not block and returns immediately if there is no message in the queue. To continue processing all messages in the queue:
while true # repeat forever
while true
break unless queue.get do |msg|
... do something with msg ...
true
end
end
sleep 5 # no messages, wait
end
:call-seq:
queue.get([selector]) -> msg or nil
queue.get([selector]) {|msg| ... } -> obj
See: Message
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/reliable-msg/queue.rb', line 228 def get selector = nil, &block tx = old_tx = Thread.current[THREAD_CURRENT_TX] # If block, begin a new transaction. if block tx = {:qm=>qm} tx[:tid] = tx[:qm].begin tx_timeout Thread.current[THREAD_CURRENT_TX] = tx end result = begin # Validate the selector: nil, string or hash. selector = case selector when String {:id=>selector} when Hash, Selector, nil selector else raise ArgumentError, ERROR_INVALID_SELECTOR end # If using selector object, obtain a list of all message headers # for the queue (shared by all Queue/Selector objects accessing # the same queue) and run the selector on that list. Pick one # message and switch to an :id selector to retrieve it. if selector.is_a?(Selector) cached = @@headers_cache[@queue] ||= CachedHeaders.new id = cached.next(selector) do if tx tx[:qm].list :queue=>@queue, :tid=>tx[:tid] else repeated { |qm| qm.list :queue=>@queue } end end return nil unless id selector = {:id=>id} end # If inside a transaction, always retrieve from the same queue manager, # otherwise, allow repeated() to try and access multiple queue managers. = if tx tx[:qm].enqueue :queue=>@queue, :selector=>selector, :tid=>tx[:tid] else repeated { |qm| qm.enqueue :queue=>@queue, :selector=>selector } end # Result is either message, or result from processing block. Note that # calling block may raise an exception. We deserialize the message here # for two reasons: # 1. It creates a distinct copy, so changing the message object and returning # it to the queue (abort) does not affect other consumers. # 2. The message may rely on classes known to the client but not available # to the queue manager. result = if = Message.new([:id], [:headers], Marshal::load([:message])) block ? block.call() : end rescue Exception=>error # Abort the transaction if we started it. Propagate error. qm.abort(tx[:tid]) if block raise error ensure # Resume the old transaction. Thread.current[THREAD_CURRENT_TX] = old_tx if block end # Commit the transaction and return the result. We do this outside the main # block, since we don't abort in case of error (commit is one-phase) and we # don't retain the transaction association, it completes by definition. qm.commit(tx[:tid]) if block result end |
#name ⇒ Object
Returns the queue name.
297 298 299 |
# File 'lib/reliable-msg/queue.rb', line 297 def name @queue end |
#put(message, headers = nil) ⇒ Object
Put a message in the queue.
The message
argument is required, but may be nil
Headers are optional. Headers are used to provide the application with additional information about the message, and can be used to retrieve messages (see Queue.get for discussion of selectors). Some headers are used to handle message processing internally (e.g. :priority
, :expires
).
Each header uses a symbol for its name. The value may be string, numeric, true/false or nil. No other objects are allowed. To improve performance, keep headers as small as possible.
The following headers have special meaning:
-
:delivery
– The message delivery mode. -
:queue
– Puts the message in the named queue. Otherwise, uses the queue specified when creating this Queue object. -
:priority
– The message priority. Messages with higher priority are retrieved first. -
:expires
– Message expiration in seconds. Messages do not expire unless specified. Zero ornil
means no expiration. -
:expires_at
– Specifies when the message expires (timestamp). Alternative to:expires
. -
:max_deliveries
– Maximum number of attempts to deliver message, afterwhich message moves to the DLQ. Minimum is 1 (deliver only once), default is 5 (deliver up to 5 times).
Headers can be set on a per-queue basis when the Queue is created. This only affects messages put through that Queue object.
Messages can be delivered using one of three delivery modes:
-
:best_effort
– Attempt to deliver the message once. If the message expires or cannot be delivered, discard the message. The is the default delivery mode. -
:repeated
– Attempt to deliver until message expires, or up to maximum delivery attempts (see:max_deliveries
). Afterwards, move message to dead-letter queue. -
:once
– Attempt to deliver message exactly once. If message expires, or first delivery attempt fails, move message to dead-letter queue.
For example:
queue.put request
queue.put notice, :expires=>10
queue.put object, :queue=>'other-queue'
:call-seq:
queue.put(message[, headers]) -> id
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/reliable-msg/queue.rb', line 128 def put , headers = nil tx = Thread.current[THREAD_CURRENT_TX] # Use headers supplied by callers, or defaults for this queue. defaults = { :priority => @priority || 0, :expires => @expires, :max_deliveries => @max_deliveries || DEFAULT_MAX_DELIVERIES, :delivery => @delivery || :best_effort } headers = headers ? defaults.merge(headers) : defaults # Serialize the message before sending to queue manager. We need the # message to be serialized for storage, this just saves duplicate # serialization when using DRb. = Marshal::dump # If inside a transaction, always send to the same queue manager, otherwise, # allow repeated() to try and access multiple queue managers. if tx return tx[:qm].queue(:message=>, :headers=>headers, :queue=>(headers[:queue] || @queue), :tid=>tx[:tid]) else return repeated { |qm| qm.queue :message=>, :headers=>headers, :queue=>(headers[:queue] || @queue) } end end |