Class: PikaQue::Handlers::ErrorHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/handlers/error_handler.rb

Constant Summary collapse

DEFAULT_ERROR_OPTS =
{
  :error_prefix => 'pika-que'
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ ErrorHandler

Returns a new instance of ErrorHandler.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/pika_que/handlers/error_handler.rb', line 9

def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_ERROR_OPTS).merge(opts)
  @connection = @opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  error_ex_name = error_q_name = "#{@opts[:error_prefix]}-error"
  if @opts[:queue]
    # handle deprecated options
    error_ex_name = @opts[:exchange]
    error_q_name = @opts[:queue]
  end
  @exchange = @channel.exchange(error_ex_name, type: :topic, durable: exchange_durable?)
  @queue = @channel.queue(error_q_name, durable: queue_durable?)
  @queue.bind(@exchange, routing_key: '#')
  @monitor = Monitor.new
end

Instance Method Details

#bind_queue(queue, routing_key) ⇒ Object



25
26
# File 'lib/pika_que/handlers/error_handler.rb', line 25

def bind_queue(queue, routing_key)
end

#closeObject



46
47
48
# File 'lib/pika_que/handlers/error_handler.rb', line 46

def close
  @channel.close unless @channel.closed?
end

#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pika_que/handlers/error_handler.rb', line 28

def handle(response_code, channel, delivery_info, , msg, error = nil)
  case response_code
  when :ack
    PikaQue.logger.debug "ErrorHandler acknowledge <#{msg}>"
    channel.acknowledge(delivery_info.delivery_tag, false)
  when :reject
    PikaQue.logger.debug "ErrorHandler reject <#{msg}>"
    channel.reject(delivery_info.delivery_tag, false)
  when :requeue
    PikaQue.logger.debug "ErrorHandler requeue <#{msg}>"
    channel.reject(delivery_info.delivery_tag, true)
  else
    PikaQue.logger.debug "ErrorHandler publishing <#{msg}> to [#{@queue.name}]"
    publish(delivery_info, msg)
    channel.acknowledge(delivery_info.delivery_tag, false)
  end
end