Class: PikaQue::Handlers::RetryHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/handlers/retry_handler.rb

Constant Summary collapse

DEFAULT_RETRY_OPTS =

Create following exchanges with retry_prefix = pika-que pika-que-retry pika-que-retry-requeue pika-que-error and following queues pika-que-retry-60 (default for const mode) pika-que-retry-120 pika-que-retry-240 pika-que-retry-480 pika-que-retry-960

retry_mode can be either :exp or :const

{
  :retry_prefix             => 'pika-que',
  :retry_mode               => :exp,
  :retry_max_times          => 5,
  :retry_backoff_base       => 0,
  :retry_backoff_multiplier => 1000,
  :retry_const_backoff      => 60
}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ RetryHandler

Returns a new instance of RetryHandler.



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pika_que/handlers/retry_handler.rb', line 27

def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_RETRY_OPTS).merge(opts)
  @connection = opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  @retry_monitor = Monitor.new
  @error_monitor = Monitor.new

  @max_retries = @opts[:retry_max_times]
  @backoff_base = @opts[:retry_backoff_base]
  @backoff_multiplier = @opts[:retry_backoff_multiplier] # This is for example/dev/test

  @retry_name = "#{@opts[:retry_prefix]}-retry"
  @requeue_name = "#{@opts[:retry_prefix]}-retry-requeue"
  @error_name = "#{@opts[:retry_prefix]}-error"

  setup_exchanges
  setup_queues
end

Class Method Details

.backoff_periods(max_retries, backoff_base) ⇒ Object

formula base X = 0, 15(2x), 30(3x), 45(4x), 60(5x), 120, 180, etc defaults to 0 (X + 15) * 2 ** (count + 1)



77
78
79
# File 'lib/pika_que/handlers/retry_handler.rb', line 77

def self.backoff_periods(max_retries, backoff_base)
  (1..max_retries).map{ |c| next_ttl(c, backoff_base) }
end

.next_ttl(count, backoff_base) ⇒ Object



81
82
83
# File 'lib/pika_que/handlers/retry_handler.rb', line 81

def self.next_ttl(count, backoff_base)
  (backoff_base + 15) * 2 ** (count + 1)
end

Instance Method Details

#bind_queue(queue, routing_key) ⇒ Object



46
47
48
49
# File 'lib/pika_que/handlers/retry_handler.rb', line 46

def bind_queue(queue, routing_key)
  # bind the worker queue to requeue exchange
  queue.bind(@requeue_exchange, :routing_key => routing_key)
end

#closeObject



68
69
70
# File 'lib/pika_que/handlers/retry_handler.rb', line 68

def close
  @channel.close unless @channel.closed?
end

#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/pika_que/handlers/retry_handler.rb', line 51

def handle(response_code, channel, delivery_info, , msg, error = nil)
  case response_code
  when :ack
    PikaQue.logger.debug "RetryHandler acknowledge <#{msg}>"
    channel.acknowledge(delivery_info.delivery_tag, false)
  when :reject
    PikaQue.logger.debug "RetryHandler reject retry <#{msg}>"
    handle_retry(channel, delivery_info, , msg, :reject)
  when :requeue
    PikaQue.logger.debug "RetryHandler requeue <#{msg}>"
    channel.reject(delivery_info.delivery_tag, true)
  else
    PikaQue.logger.debug "RetryHandler error retry <#{msg}>"
    handle_retry(channel, delivery_info, , msg, error)
  end
end