Class: Tackle::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/tackle/consumer.rb,
lib/tackle/consumer/queue.rb,
lib/tackle/consumer/params.rb,
lib/tackle/consumer/message.rb,
lib/tackle/consumer/exchange.rb,
lib/tackle/consumer/dead_queue.rb,
lib/tackle/consumer/main_queue.rb,
lib/tackle/consumer/delay_queue.rb,
lib/tackle/consumer/active_record_connection_reaper.rb

Defined Under Namespace

Modules: ActiveRecordConnectionReaper Classes: DeadQueue, DelayQueue, Exchange, MainQueue, Message, Params, Queue

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Consumer

Returns a new instance of Consumer.



14
15
16
17
18
19
# File 'lib/tackle/consumer.rb', line 14

def initialize(params)
  @params = params
  @logger = @params.logger

  setup_rabbit_connections
end

Instance Method Details

#process_message(message, &block) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/tackle/consumer.rb', line 44

def process_message(message, &block)
  Tackle::Consumer::ActiveRecordConnectionReaper.run do
    begin
      message.log_info "Calling message processor"

      response = block.call(message.payload)

      unless @params.manual_ack?
        response = Tackle::ACK
      end

      case response
      when Tackle::ACK
        message.ack
      when Tackle::NACK
        redeliver_message(message, "Received Tackle::NACK")
      else
        raise "Response must be either Tackle::ACK or Tackle::NACK"
      end
    rescue Exception => ex
      redeliver_message(message, "Received exception '#{ex}'")

      raise ex
    end
  end
end

#redeliver_message(message, reason) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/tackle/consumer.rb', line 71

def redeliver_message(message, reason)
  message.log_error "Failed to process message. #{reason}"
  message.log_error "Retry count #{message.retry_count}/#{@params.retry_limit}"

  if message.retry_count < @params.retry_limit
    @delay_queue.publish(message)
  else
    @dead_queue.publish(message)
  end

  message.nack
end

#setup_rabbit_connectionsObject



21
22
23
24
25
26
27
28
29
30
# File 'lib/tackle/consumer.rb', line 21

def setup_rabbit_connections
  @connection = Tackle::Connection.new(@params.amqp_url, @params.exception_handler, @logger, @params.connection)

  @exchange    = Exchange.new(@params.service, @params.routing_key, @connection, @logger)
  @main_queue  = MainQueue.new(@exchange, @connection, @logger)
  @delay_queue = DelayQueue.new(@params.retry_delay, @exchange, @connection, @logger)
  @dead_queue  = DeadQueue.new(@exchange, @connection, @logger)

  @exchange.bind_to_exchange(@params.exchange)
end

#subscribe(&block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/tackle/consumer.rb', line 32

def subscribe(&block)
  @logger.info "Subscribing to the main queue '#{@main_queue.name}'"

  @main_queue.subscribe { |message| process_message(message, &block) }
rescue Interrupt => _
  @connection.close
rescue StandardError => ex
  @logger.error("An exception occured message='#{ex.message}'")

  raise ex
end