Class: PikaQue::Handlers::DLXRetryHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/handlers/dlx_retry_handler.rb

Constant Summary collapse

DEFAULT_RETRY_OPTS =

Create following exchanges with retry_prefix = pika-que and default backoff pika-que-retry-60 pika-que-retry-requeue pika-que-error and following queue pika-que-retry-60 (with default backoff)

{
  :retry_prefix             => 'pika-que',
  :retry_max_times          => 5,
  :retry_backoff            => 60,
  :retry_backoff_multiplier => 1000,
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ DLXRetryHandler

Returns a new instance of DLXRetryHandler.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 20

def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_RETRY_OPTS).merge(opts)
  @connection = opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  @error_monitor = Monitor.new

  @max_retries = @opts[:retry_max_times]
  @backoff_multiplier = @opts[:retry_backoff_multiplier] # This is for example/dev/test

  @retry_ex_name = @opts[:retry_dlx] || "#{@opts[:retry_prefix]}-retry-#{@opts[:retry_backoff]}"
  @retry_name = "#{@opts[:retry_prefix]}-retry"
  @requeue_name = "#{@opts[:retry_prefix]}-retry-requeue"
  @error_name = "#{@opts[:retry_prefix]}-error"

  @queue_name_lookup = {}

  setup_exchanges
  setup_queues
end

Instance Method Details

#bind_queue(queue, routing_key) ⇒ Object



40
41
42
43
44
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 40

def bind_queue(queue, routing_key)
  # bind the worker queue to requeue exchange
  @queue_name_lookup[routing_key] = queue.name
  queue.bind(@requeue_exchange, :routing_key => routing_key)
end

#closeObject



63
64
65
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 63

def close
  @channel.close unless @channel.closed?
end

#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 46

def handle(response_code, channel, delivery_info, , msg, error = nil)
  case response_code
  when :ack
    PikaQue.logger.debug "DLXRetryHandler acknowledge <#{msg}>"
    channel.acknowledge(delivery_info.delivery_tag, false)
  when :reject
    PikaQue.logger.debug "DLXRetryHandler reject retry <#{msg}>"
    handle_retry(channel, delivery_info, , msg, :reject)
  when :requeue
    PikaQue.logger.debug "DLXRetryHandler requeue <#{msg}>"
    channel.reject(delivery_info.delivery_tag, true)
  else
    PikaQue.logger.debug "DLXRetryHandler error retry <#{msg}>"
    handle_retry(channel, delivery_info, , msg, error)
  end
end