Class: PikaQue::Handlers::ErrorHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/handlers/error_handler.rb

Constant Summary collapse

DEFAULT_ERROR_OPTS =
{
  :exchange     => 'pika-que-error',
  :exchange_options => { :type => :topic },
  :queue        => 'pika-que-error',
  :routing_key  => '#'
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ ErrorHandler

Returns a new instance of ErrorHandler.



12
13
14
15
16
17
18
19
20
# File 'lib/pika_que/handlers/error_handler.rb', line 12

def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_ERROR_OPTS).merge(opts)
  @connection = @opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  @exchange = @channel.exchange(@opts[:exchange], type: exchange_type, durable: exchange_durable?)
  @queue = @channel.queue(@opts[:queue], durable: queue_durable?)
  @queue.bind(@exchange, routing_key: @opts[:routing_key])
  @monitor = Monitor.new
end

Instance Method Details

#bind_queue(queue, routing_key) ⇒ Object



22
23
# File 'lib/pika_que/handlers/error_handler.rb', line 22

def bind_queue(queue, routing_key)
end

#closeObject



43
44
45
# File 'lib/pika_que/handlers/error_handler.rb', line 43

def close
  @channel.close unless @channel.closed?
end

#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/pika_que/handlers/error_handler.rb', line 25

def handle(response_code, channel, delivery_info, , msg, error = nil)
  case response_code
  when :ack
    PikaQue.logger.debug "ErrorHandler acknowledge <#{msg}>"
    channel.acknowledge(delivery_info.delivery_tag, false)
  when :reject
    PikaQue.logger.debug "ErrorHandler reject <#{msg}>"
    channel.reject(delivery_info.delivery_tag, false)
  when :requeue
    PikaQue.logger.debug "ErrorHandler requeue <#{msg}>"
    channel.reject(delivery_info.delivery_tag, true)
  else
    PikaQue.logger.debug "ErrorHandler publishing <#{msg}> to [#{@queue.name}]"
    publish(delivery_info, msg)
    channel.acknowledge(delivery_info.delivery_tag, false)
  end
end