Class: Watership::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/watership/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(consumer, url, channel_options = {}, queue_options = {}) ⇒ Consumer

Returns a new instance of Consumer.



7
8
9
10
11
12
13
14
# File 'lib/watership/consumer.rb', line 7

def initialize(consumer, url, channel_options = {}, queue_options = {})
  @consumer = consumer
  @url = url
  @prefetch = channel_options.delete(:prefetch) || Integer(ENV.fetch("RABBIT_CONSUMER_PREFETCH", 200))
  @concurrency = channel_options.delete(:concurrency) || 1
  @channel_opts = {durable: true}.merge(channel_options)
  @queue_opts = {block: false, manual_ack: true}.merge(queue_options)
end

Instance Method Details

#ack_message(tag) ⇒ Object



67
68
69
70
# File 'lib/watership/consumer.rb', line 67

def ack_message(tag)
  logger.info "Acking message"
  channel.acknowledge(tag, false)
end

#bind(name, opts = {}) ⇒ Object



63
64
65
# File 'lib/watership/consumer.rb', line 63

def bind(name, opts = {})
  create_queue.bind(name, opts)
end

#channelObject



85
86
87
88
89
90
91
# File 'lib/watership/consumer.rb', line 85

def channel
  @channel ||= begin
    created_channel = connection.create_channel(nil, @concurrency)
    created_channel.prefetch(@prefetch)
    created_channel
  end
end

#clear_active_record_connectionsObject



106
107
108
109
110
# File 'lib/watership/consumer.rb', line 106

def clear_active_record_connections
  if defined?(::ActiveRecord::Base) && ::ActiveRecord::Base.respond_to?(:clear_active_connections!)
    ::ActiveRecord::Base.clear_active_connections!
  end
end

#connectionObject



81
82
83
# File 'lib/watership/consumer.rb', line 81

def connection
  @connection ||= Bunny.new(@url).tap { |bunny| bunny.start }
end

#consumeObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/watership/consumer.rb', line 16

def consume
  Thread.abort_on_exception = true
  queue = create_queue
  @concurrency.times do
    Thread.new do
      queue.subscribe(@queue_opts.dup) do |delivery_info, properties, payload|
        begin
          data = JSON.parse(payload)
          @consumer.new.call(data)
          success = true
        rescue StandardError => exception
          logger.error "Error thrown in subscribe block"
          logger.error exception.message
          logger.error exception.backtrace.join("\n")

          retries = data["retries"] || 0
          notify(exception, { payload: data, retries: retries })
          enqueue(data.merge(retries: (retries + 1)))
          success = true
        rescue Interrupt => exception
          logger.error "Interrupt in subscribe block"
          logger.warn "Stopped gracefully."
        ensure
          if success
            ack_message(delivery_info.delivery_tag)
          else
            reject_message(delivery_info.delivery_tag)
          end

          clear_active_record_connections
        end
      end
    end
  end

  # sleep forever
  sleeping_thread = Thread.new { sleep }
  Signal.trap("TERM") do
    sleeping_thread.terminate
  end
  sleeping_thread.join
rescue Interrupt
ensure
  logger.info "Closing Channel"
  channel.close
end

#create_queueObject



77
78
79
# File 'lib/watership/consumer.rb', line 77

def create_queue
  channel.queue(@consumer::QUEUE, @channel_opts)
end

#enqueue(message) ⇒ Object



93
94
95
# File 'lib/watership/consumer.rb', line 93

def enqueue(message)
  create_queue.publish(JSON.generate(message))
end

#loggerObject



102
103
104
# File 'lib/watership/consumer.rb', line 102

def logger
  @logger ||= defined?(Rails) ? Rails.logger : Logger.new(STDOUT)
end

#notify(exception, data) ⇒ Object



97
98
99
100
# File 'lib/watership/consumer.rb', line 97

def notify(exception, data)
  Airbrake.notify(exception) if defined?(Airbrake)
  Bugsnag.notify(exception, data: data) if defined?(Bugsnag)
end

#reject_message(tag, requeue = true) ⇒ Object



72
73
74
75
# File 'lib/watership/consumer.rb', line 72

def reject_message(tag, requeue = true)
  logger.info "Rejecting message"
  channel.reject(tag, requeue)
end