Class: PikaQue::Handlers::RetryHandler
- Inherits:
-
Object
- Object
- PikaQue::Handlers::RetryHandler
- Defined in:
- lib/pika_que/handlers/retry_handler.rb
Constant Summary collapse
- DEFAULT_RETRY_OPTS =
Create following exchanges with retry_prefix = pika-que pika-que-retry pika-que-retry-requeue pika-que-error and following queues pika-que-retry-60 (default for const mode) pika-que-retry-120 pika-que-retry-240 pika-que-retry-480 pika-que-retry-960
retry_mode can be either :exp or :const
{ :retry_prefix => 'pika-que', :retry_mode => :exp, :retry_max_times => 5, :retry_backoff_base => 0, :retry_backoff_multiplier => 1000, :retry_const_backoff => 60 }.freeze
Class Method Summary collapse
-
.backoff_periods(max_retries, backoff_base) ⇒ Object
formula base X = 0, 15(2x), 30(3x), 45(4x), 60(5x), 120, 180, etc defaults to 0 (X + 15) * 2 ** (count + 1).
- .next_ttl(count, backoff_base) ⇒ Object
Instance Method Summary collapse
- #bind_queue(queue, routing_key) ⇒ Object
- #close ⇒ Object
- #handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object
-
#initialize(opts = {}) ⇒ RetryHandler
constructor
A new instance of RetryHandler.
Constructor Details
#initialize(opts = {}) ⇒ RetryHandler
Returns a new instance of RetryHandler.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pika_que/handlers/retry_handler.rb', line 27 def initialize(opts = {}) @opts = PikaQue.config.merge(DEFAULT_RETRY_OPTS).merge(opts) @connection = opts[:connection] || PikaQue.connection @channel = @connection.create_channel @retry_monitor = Monitor.new @error_monitor = Monitor.new @max_retries = @opts[:retry_max_times] @backoff_base = @opts[:retry_backoff_base] @backoff_multiplier = @opts[:retry_backoff_multiplier] # This is for example/dev/test @retry_name = "#{@opts[:retry_prefix]}-retry" @requeue_name = "#{@opts[:retry_prefix]}-retry-requeue" @error_name = "#{@opts[:retry_prefix]}-error" setup_exchanges setup_queues end |
Class Method Details
.backoff_periods(max_retries, backoff_base) ⇒ Object
formula base X = 0, 15(2x), 30(3x), 45(4x), 60(5x), 120, 180, etc defaults to 0 (X + 15) * 2 ** (count + 1)
77 78 79 |
# File 'lib/pika_que/handlers/retry_handler.rb', line 77 def self.backoff_periods(max_retries, backoff_base) (1..max_retries).map{ |c| next_ttl(c, backoff_base) } end |
.next_ttl(count, backoff_base) ⇒ Object
81 82 83 |
# File 'lib/pika_que/handlers/retry_handler.rb', line 81 def self.next_ttl(count, backoff_base) (backoff_base + 15) * 2 ** (count + 1) end |
Instance Method Details
#bind_queue(queue, routing_key) ⇒ Object
46 47 48 49 |
# File 'lib/pika_que/handlers/retry_handler.rb', line 46 def bind_queue(queue, routing_key) # bind the worker queue to requeue exchange queue.bind(@requeue_exchange, :routing_key => routing_key) end |
#close ⇒ Object
68 69 70 |
# File 'lib/pika_que/handlers/retry_handler.rb', line 68 def close @channel.close unless @channel.closed? end |
#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/pika_que/handlers/retry_handler.rb', line 51 def handle(response_code, channel, delivery_info, , msg, error = nil) case response_code when :ack PikaQue.logger.debug "RetryHandler acknowledge <#{msg}>" channel.acknowledge(delivery_info.delivery_tag, false) when :reject PikaQue.logger.debug "RetryHandler reject retry <#{msg}>" handle_retry(channel, delivery_info, , msg, :reject) when :requeue PikaQue.logger.debug "RetryHandler requeue <#{msg}>" channel.reject(delivery_info.delivery_tag, true) else PikaQue.logger.debug "RetryHandler error retry <#{msg}>" handle_retry(channel, delivery_info, , msg, error) end end |