Class: Sneakers::Handlers::Maxretry
- Inherits:
-
Object
- Object
- Sneakers::Handlers::Maxretry
- Defined in:
- lib/sneakers/handlers/maxretry.rb
Overview
Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections and errors). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.
Input:
worker_exchange (eXchange)
worker_queue (Queue)
We create:
worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
requeuing directly to the worker_queue.
This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.
Many of these can be override with options:
-
retry_exchange - sets retry exchange & queue
-
retry_error_exchange - sets error exchange and queue
-
retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.
Class Method Summary collapse
Instance Method Summary collapse
- #acknowledge(hdr, props, msg) ⇒ Object
- #error(hdr, props, msg, err) ⇒ Object
-
#initialize(channel, queue, opts) ⇒ Maxretry
constructor
A new instance of Maxretry.
- #noop(hdr, props, msg) ⇒ Object
- #reject(hdr, props, msg, requeue = false) ⇒ Object
Constructor Details
#initialize(channel, queue, opts) ⇒ Maxretry
Returns a new instance of Maxretry.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/sneakers/handlers/maxretry.rb', line 36 def initialize(channel, queue, opts) @worker_queue_name = queue.name Sneakers.logger.debug do "#{log_prefix} creating handler, opts=#{opts}" end @channel = channel @opts = opts # Construct names, defaulting where suitable retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry" error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" retry_routing_key = @opts[:retry_routing_key] || "#" # Create the exchanges @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } @channel.exchange(name, :type => 'topic', :durable => exchange_durable?) end # Create the queues and bindings Sneakers.logger.debug do "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" end @retry_queue = @channel.queue(retry_name, :durable => queue_durable?, :arguments => { :'x-dead-letter-exchange' => requeue_name, :'x-message-ttl' => @opts[:retry_timeout] || 60000 }) @retry_queue.bind(@retry_exchange, :routing_key => '#') Sneakers.logger.debug do "#{log_prefix} creating queue=#{error_name}" end @error_queue = @channel.queue(error_name, :durable => queue_durable?) @error_queue.bind(@error_exchange, :routing_key => '#') # Finally, bind the worker queue to our requeue exchange queue.bind(@requeue_exchange, :routing_key => retry_routing_key) @max_retries = @opts[:retry_max_times] || 5 end |
Class Method Details
.configure_queue(name, opts) ⇒ Object
85 86 87 88 89 90 |
# File 'lib/sneakers/handlers/maxretry.rb', line 85 def self.configure_queue(name, opts) retry_name = opts.fetch(:retry_exchange, "#{name}-retry") opt_args = opts[:queue_options][:arguments] ? opts[:queue_options][:arguments].inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} : {} opts[:queue_options][:arguments] = { :'x-dead-letter-exchange' => retry_name }.merge(opt_args) opts[:queue_options] end |
Instance Method Details
#acknowledge(hdr, props, msg) ⇒ Object
92 93 94 |
# File 'lib/sneakers/handlers/maxretry.rb', line 92 def acknowledge(hdr, props, msg) @channel.acknowledge(hdr.delivery_tag, false) end |
#error(hdr, props, msg, err) ⇒ Object
107 108 109 |
# File 'lib/sneakers/handlers/maxretry.rb', line 107 def error(hdr, props, msg, err) handle_retry(hdr, props, msg, err) end |
#noop(hdr, props, msg) ⇒ Object
111 112 113 |
# File 'lib/sneakers/handlers/maxretry.rb', line 111 def noop(hdr, props, msg) end |
#reject(hdr, props, msg, requeue = false) ⇒ Object
96 97 98 99 100 101 102 103 104 |
# File 'lib/sneakers/handlers/maxretry.rb', line 96 def reject(hdr, props, msg, requeue = false) if requeue # This was explicitly rejected specifying it be requeued so we do not # want it to pass through our retry logic. @channel.reject(hdr.delivery_tag, requeue) else handle_retry(hdr, props, msg, :reject) end end |