Class: Sneakers::Handlers::Maxretry

Inherits:
Object
  • Object
show all
Defined in:
lib/sneakers/handlers/maxretry.rb

Overview

Maxretry uses dead letter policies on Rabbitmq to requeue and retry messages after failure (rejections and errors). When the maximum number of retries is reached it will put the message on an error queue. This handler will only retry at the queue level. To accomplish that, the setup is a bit complex.

Input:

worker_exchange (eXchange)
worker_queue (Queue)

We create:

worker_queue-retry - (X) where we setup the worker queue to dead-letter.
worker_queue-retry - (Q) queue bound to ^ exchange, dead-letters to
                     worker_queue-retry-requeue.
worker_queue-error - (X) where to send max-retry failures
worker_queue-error - (Q) bound to worker_queue-error.
worker_queue-retry-requeue - (X) exchange to bind worker_queue to for
                             requeuing directly to the worker_queue.

This requires that you setup arguments to the worker queue to line up the dead letter queue. See the example for more information.

Many of these can be override with options:

  • retry_exchange - sets retry exchange & queue

  • retry_error_exchange - sets error exchange and queue

  • retry_requeue_exchange - sets the exchange created to re-queue things back to the worker queue.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, queue, opts) ⇒ Maxretry

Returns a new instance of Maxretry.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/sneakers/handlers/maxretry.rb', line 36

def initialize(channel, queue, opts)
  @worker_queue_name = queue.name
  Sneakers.logger.debug do
    "#{log_prefix} creating handler, opts=#{opts}"
  end

  @channel = channel
  @opts = opts

  # Construct names, defaulting where suitable
  retry_name = @opts[:retry_exchange] || "#{@worker_queue_name}-retry"
  error_name = @opts[:retry_error_exchange] || "#{@worker_queue_name}-error"
  requeue_name = @opts[:retry_requeue_exchange] || "#{@worker_queue_name}-retry-requeue"
  retry_routing_key = @opts[:retry_routing_key] || "#"

  # Create the exchanges
  @retry_exchange, @error_exchange, @requeue_exchange = [retry_name, error_name, requeue_name].map do |name|
    Sneakers.logger.debug { "#{log_prefix} creating exchange=#{name}" }
    @channel.exchange(name,
                      :type => 'topic',
                      :durable => exchange_durable?)
  end

  # Create the queues and bindings
  Sneakers.logger.debug do
    "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
  end
  @retry_queue = @channel.queue(retry_name,
                               :durable => queue_durable?,
                               :arguments => {
                                 :'x-dead-letter-exchange' => requeue_name,
                                 :'x-message-ttl' => @opts[:retry_timeout] || 60000
                               })
  @retry_queue.bind(@retry_exchange, :routing_key => '#')

  Sneakers.logger.debug do
    "#{log_prefix} creating queue=#{error_name}"
  end
  @error_queue = @channel.queue(error_name,
                                :durable => queue_durable?)
  @error_queue.bind(@error_exchange, :routing_key => '#')

  # Finally, bind the worker queue to our requeue exchange
  queue.bind(@requeue_exchange, :routing_key => retry_routing_key)

  @max_retries = @opts[:retry_max_times] || 5

end

Class Method Details

.configure_queue(name, opts) ⇒ Object



85
86
87
88
89
90
# File 'lib/sneakers/handlers/maxretry.rb', line 85

def self.configure_queue(name, opts)
  retry_name = opts.fetch(:retry_exchange, "#{name}-retry")
  opt_args = opts[:queue_options][:arguments] ? opts[:queue_options][:arguments].inject({}){|memo,(k,v)| memo[k.to_sym] = v; memo} : {}
  opts[:queue_options][:arguments] = { :'x-dead-letter-exchange' => retry_name }.merge(opt_args)
  opts[:queue_options]
end

Instance Method Details

#acknowledge(hdr, props, msg) ⇒ Object



92
93
94
# File 'lib/sneakers/handlers/maxretry.rb', line 92

def acknowledge(hdr, props, msg)
  @channel.acknowledge(hdr.delivery_tag, false)
end

#error(hdr, props, msg, err) ⇒ Object



107
108
109
# File 'lib/sneakers/handlers/maxretry.rb', line 107

def error(hdr, props, msg, err)
  handle_retry(hdr, props, msg, err)
end

#noop(hdr, props, msg) ⇒ Object



111
112
113
# File 'lib/sneakers/handlers/maxretry.rb', line 111

def noop(hdr, props, msg)

end

#reject(hdr, props, msg, requeue = false) ⇒ Object



96
97
98
99
100
101
102
103
104
# File 'lib/sneakers/handlers/maxretry.rb', line 96

def reject(hdr, props, msg, requeue = false)
  if requeue
    # This was explicitly rejected specifying it be requeued so we do not
    # want it to pass through our retry logic.
    @channel.reject(hdr.delivery_tag, requeue)
  else
    handle_retry(hdr, props, msg, :reject)
  end
end