Class: SneakersRetry::Handlers::Maxretry2
- Inherits:
-
Object
- Object
- SneakersRetry::Handlers::Maxretry2
- Defined in:
- lib/sneakers-retry/handlers/maxretry2.rb
Overview
Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections, errors and timeouts). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.
Input:
worker_exchange (eXchange)
worker_queue (Queue)
We create:
worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
requeuing directly to the worker_queue.
This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.
Many of these can be override with options:
-
retry_exchange - sets retry exchange & queue
-
retry_error_exchange - sets error exchange and queue
-
retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.
Instance Method Summary collapse
- #acknowledge(hdr, props, msg) ⇒ Object
- #error(hdr, props, msg, err) ⇒ Object
-
#initialize(channel, queue, opts) ⇒ Maxretry2
constructor
A new instance of Maxretry2.
- #noop(hdr, props, msg) ⇒ Object
- #reject(hdr, props, msg, requeue = false) ⇒ Object
- #timeout(hdr, props, msg) ⇒ Object
Constructor Details
#initialize(channel, queue, opts) ⇒ Maxretry2
Returns a new instance of Maxretry2.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 35 def initialize(channel, queue, opts) puts "################################" @worker_queue_name = queue.name Sneakers.logger.debug do "#{log_prefix} creating handler, opts=#{opts}" end @channel = channel @opts = opts # Construct names, defaulting where suitable retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry" error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error" requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue" retry_routing_key = @opts[:retry_routing_key] || "#" # Create the exchanges @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name| Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" } @channel.exchange(name, :type => 'topic', :durable => exchange_durable?) end # Create the queues and bindings Sneakers.logger.debug do "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" end @retry_queue = @channel.queue(retry_name, :durable => queue_durable?, :arguments => { :'x-dead-letter-exchange' => requeue_name, :'x-message-ttl' => @opts[:retry_timeout] || 60000 }) @retry_queue.bind(@retry_exchange, :routing_key => '#') Sneakers.logger.debug do "#{log_prefix} creating queue=#{error_name}" end @error_queue = @channel.queue(error_name, :durable => queue_durable?) @error_queue.bind(@error_exchange, :routing_key => '#') # Finally, bind the worker queue to our requeue exchange queue.bind(@requeue_exchange, :routing_key => retry_routing_key) @max_retries = @opts[:retry_max_times] || 5 end |
Instance Method Details
#acknowledge(hdr, props, msg) ⇒ Object
85 86 87 |
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 85 def acknowledge(hdr, props, msg) @channel.acknowledge(hdr.delivery_tag, false) end |
#error(hdr, props, msg, err) ⇒ Object
100 101 102 |
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 100 def error(hdr, props, msg, err) handle_retry(hdr, props, msg, err) end |
#noop(hdr, props, msg) ⇒ Object
108 109 110 |
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 108 def noop(hdr, props, msg) end |
#reject(hdr, props, msg, requeue = false) ⇒ Object
89 90 91 92 93 94 95 96 97 |
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 89 def reject(hdr, props, msg, requeue = false) if requeue # This was explicitly rejected specifying it be requeued so we do not # want it to pass through our retry logic. @channel.reject(hdr.delivery_tag, requeue) else handle_retry(hdr, props, msg, :reject) end end |
#timeout(hdr, props, msg) ⇒ Object
104 105 106 |
# File 'lib/sneakers-retry/handlers/maxretry2.rb', line 104 def timeout(hdr, props, msg) handle_retry(hdr, props, msg, :timeout) end |