Class: Watership::Consumer
- Inherits:
-
Object
- Object
- Watership::Consumer
- Defined in:
- lib/watership/consumer.rb
Instance Method Summary collapse
- #ack_message(tag) ⇒ Object
- #bind(name, opts = {}) ⇒ Object
- #channel ⇒ Object
- #clear_active_record_connections ⇒ Object
- #connection ⇒ Object
- #consume ⇒ Object
- #create_queue ⇒ Object
- #enqueue(message) ⇒ Object
-
#initialize(consumer, url, channel_options = {}, queue_options = {}) ⇒ Consumer
constructor
A new instance of Consumer.
- #logger ⇒ Object
- #notify(exception, data) ⇒ Object
- #reject_message(tag, requeue = true) ⇒ Object
Constructor Details
#initialize(consumer, url, channel_options = {}, queue_options = {}) ⇒ Consumer
Returns a new instance of Consumer.
7 8 9 10 11 12 13 14 |
# File 'lib/watership/consumer.rb', line 7 def initialize(consumer, url, = {}, = {}) @consumer = consumer @url = url @prefetch = .delete(:prefetch) || Integer(ENV.fetch("RABBIT_CONSUMER_PREFETCH", 200)) @concurrency = .delete(:concurrency) || 1 @channel_opts = {durable: true}.merge() @queue_opts = {block: false, manual_ack: true}.merge() end |
Instance Method Details
#ack_message(tag) ⇒ Object
67 68 69 70 |
# File 'lib/watership/consumer.rb', line 67 def (tag) logger.info "Acking message" channel.acknowledge(tag, false) end |
#bind(name, opts = {}) ⇒ Object
63 64 65 |
# File 'lib/watership/consumer.rb', line 63 def bind(name, opts = {}) create_queue.bind(name, opts) end |
#channel ⇒ Object
85 86 87 88 89 90 91 |
# File 'lib/watership/consumer.rb', line 85 def channel @channel ||= begin created_channel = connection.create_channel(nil, @concurrency) created_channel.prefetch(@prefetch) created_channel end end |
#clear_active_record_connections ⇒ Object
106 107 108 109 110 |
# File 'lib/watership/consumer.rb', line 106 def clear_active_record_connections if defined?(::ActiveRecord::Base) && ::ActiveRecord::Base.respond_to?(:clear_active_connections!) ::ActiveRecord::Base.clear_active_connections! end end |
#connection ⇒ Object
81 82 83 |
# File 'lib/watership/consumer.rb', line 81 def connection @connection ||= Bunny.new(@url).tap { |bunny| bunny.start } end |
#consume ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/watership/consumer.rb', line 16 def consume Thread.abort_on_exception = true queue = create_queue @concurrency.times do Thread.new do queue.subscribe(@queue_opts.dup) do |delivery_info, properties, payload| begin data = JSON.parse(payload) @consumer.new.call(data) success = true rescue StandardError => exception logger.error "Error thrown in subscribe block" logger.error exception. logger.error exception.backtrace.join("\n") retries = data["retries"] || 0 notify(exception, { payload: data, retries: retries }) enqueue(data.merge(retries: (retries + 1))) success = true rescue Interrupt => exception logger.error "Interrupt in subscribe block" logger.warn "Stopped gracefully." ensure if success (delivery_info.delivery_tag) else (delivery_info.delivery_tag) end clear_active_record_connections end end end end # sleep forever sleeping_thread = Thread.new { sleep } Signal.trap("TERM") do sleeping_thread.terminate end sleeping_thread.join rescue Interrupt ensure logger.info "Closing Channel" channel.close end |
#create_queue ⇒ Object
77 78 79 |
# File 'lib/watership/consumer.rb', line 77 def create_queue channel.queue(@consumer::QUEUE, @channel_opts) end |
#enqueue(message) ⇒ Object
93 94 95 |
# File 'lib/watership/consumer.rb', line 93 def enqueue() create_queue.publish(JSON.generate()) end |
#logger ⇒ Object
102 103 104 |
# File 'lib/watership/consumer.rb', line 102 def logger @logger ||= defined?(Rails) ? Rails.logger : Logger.new(STDOUT) end |
#notify(exception, data) ⇒ Object
97 98 99 100 |
# File 'lib/watership/consumer.rb', line 97 def notify(exception, data) Airbrake.notify(exception) if defined?(Airbrake) Bugsnag.notify(exception, data: data) if defined?(Bugsnag) end |
#reject_message(tag, requeue = true) ⇒ Object
72 73 74 75 |
# File 'lib/watership/consumer.rb', line 72 def (tag, requeue = true) logger.info "Rejecting message" channel.reject(tag, requeue) end |