Class: PikaQue::Handlers::DLXRetryHandler
- Inherits:
-
Object
- Object
- PikaQue::Handlers::DLXRetryHandler
- Defined in:
- lib/pika_que/handlers/dlx_retry_handler.rb
Constant Summary collapse
- DEFAULT_RETRY_OPTS =
Create following exchanges with retry_prefix = pika-que and default backoff pika-que-retry-60 pika-que-retry-requeue pika-que-error and following queue pika-que-retry-60 (with default backoff)
{ :retry_prefix => 'pika-que', :retry_max_times => 5, :retry_backoff => 60, :retry_backoff_multiplier => 1000, }.freeze
Instance Method Summary collapse
- #bind_queue(queue, routing_key) ⇒ Object
- #close ⇒ Object
- #handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object
-
#initialize(opts = {}) ⇒ DLXRetryHandler
constructor
A new instance of DLXRetryHandler.
Constructor Details
#initialize(opts = {}) ⇒ DLXRetryHandler
Returns a new instance of DLXRetryHandler.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 20 def initialize(opts = {}) @opts = PikaQue.config.merge(DEFAULT_RETRY_OPTS).merge(opts) @connection = opts[:connection] || PikaQue.connection @channel = @connection.create_channel @error_monitor = Monitor.new @max_retries = @opts[:retry_max_times] @backoff_multiplier = @opts[:retry_backoff_multiplier] # This is for example/dev/test @retry_ex_name = @opts[:retry_dlx] || "#{@opts[:retry_prefix]}-retry-#{@opts[:retry_backoff]}" @retry_name = "#{@opts[:retry_prefix]}-retry" @requeue_name = "#{@opts[:retry_prefix]}-retry-requeue" @error_name = "#{@opts[:retry_prefix]}-error" @queue_name_lookup = {} setup_exchanges setup_queues end |
Instance Method Details
#bind_queue(queue, routing_key) ⇒ Object
40 41 42 43 44 |
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 40 def bind_queue(queue, routing_key) # bind the worker queue to requeue exchange @queue_name_lookup[routing_key] = queue.name queue.bind(@requeue_exchange, :routing_key => routing_key) end |
#close ⇒ Object
63 64 65 |
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 63 def close @channel.close unless @channel.closed? end |
#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/pika_que/handlers/dlx_retry_handler.rb', line 46 def handle(response_code, channel, delivery_info, , msg, error = nil) case response_code when :ack PikaQue.logger.debug "DLXRetryHandler acknowledge <#{msg}>" channel.acknowledge(delivery_info.delivery_tag, false) when :reject PikaQue.logger.debug "DLXRetryHandler reject retry <#{msg}>" handle_retry(channel, delivery_info, , msg, :reject) when :requeue PikaQue.logger.debug "DLXRetryHandler requeue <#{msg}>" channel.reject(delivery_info.delivery_tag, true) else PikaQue.logger.debug "DLXRetryHandler error retry <#{msg}>" handle_retry(channel, delivery_info, , msg, error) end end |