Class: Cuniculus::Consumer
- Inherits:
-
Object
- Object
- Cuniculus::Consumer
- Defined in:
- lib/cuniculus/consumer.rb
Constant Summary collapse
- POLL_TIME =
5
- JOB_REQUIRED_KEYS =
%w[class args].freeze
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#job_queue ⇒ Object
readonly
Returns the value of attribute job_queue.
-
#queue_config ⇒ Object
readonly
Returns the value of attribute queue_config.
Instance Method Summary collapse
- #constantize(str) ⇒ Object
- #handle_error(e) ⇒ Object
-
#initialize(queue_config, channel) ⇒ Consumer
constructor
A new instance of Consumer.
- #maybe_retry(delivery_info, item) ⇒ Object
- #parse_job(payload) ⇒ Object
- #run_job(delivery_info, _properties, payload) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(queue_config, channel) ⇒ Consumer
Returns a new instance of Consumer.
13 14 15 16 |
# File 'lib/cuniculus/consumer.rb', line 13 def initialize(queue_config, channel) @channel = channel @queue_config = queue_config end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
11 12 13 |
# File 'lib/cuniculus/consumer.rb', line 11 def channel @channel end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
11 12 13 |
# File 'lib/cuniculus/consumer.rb', line 11 def exchange @exchange end |
#job_queue ⇒ Object (readonly)
Returns the value of attribute job_queue.
11 12 13 |
# File 'lib/cuniculus/consumer.rb', line 11 def job_queue @job_queue end |
#queue_config ⇒ Object (readonly)
Returns the value of attribute queue_config.
11 12 13 |
# File 'lib/cuniculus/consumer.rb', line 11 def queue_config @queue_config end |
Instance Method Details
#constantize(str) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/cuniculus/consumer.rb', line 80 def constantize(str) return Object.const_get(str) unless str.include?("::") names = str.split("::") names.shift if names.empty? || names.first.empty? names.inject(Object) do |constant, name| constant.const_get(name, false) end end |
#handle_error(e) ⇒ Object
75 76 77 78 |
# File 'lib/cuniculus/consumer.rb', line 75 def handle_error(e) Cuniculus.logger.error("#{e.class.name}: #{e.}") Cuniculus.logger.error(e.backtrace.join("\n")) unless e.backtrace.nil? end |
#maybe_retry(delivery_info, item) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/cuniculus/consumer.rb', line 57 def maybe_retry(delivery_info, item) retry_count = item["_cun_retries"].to_i retry_queue_name = job_queue.retry_queue(retry_count) unless retry_queue_name channel.nack(delivery_info.delivery_tag, false, false) return end payload = Cuniculus.dump_job(item.merge("_cun_retries" => retry_count + 1)) exchange.publish( payload, { routing_key: retry_queue_name, persistent: true } ) channel.ack(delivery_info.delivery_tag, false) end |
#parse_job(payload) ⇒ Object
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/cuniculus/consumer.rb', line 46 def parse_job(payload) msg = Cuniculus.load_job(payload) raise Cuniculus::BadlyFormattedPayload, "Consumed message with missing information: #{payload}\nIt should have keys [#{JOB_REQUIRED_KEYS.join(', ')}]" unless (JOB_REQUIRED_KEYS - msg.keys).empty? msg rescue Cuniculus::BadlyFormattedPayload raise rescue StandardError => ex raise Cuniculus.convert_exception_class(ex, Cuniculus::BadlyFormattedPayload), "Badly formatted consumed message: #{payload}" end |
#run_job(delivery_info, _properties, payload) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/cuniculus/consumer.rb', line 31 def run_job(delivery_info, _properties, payload) item = parse_job(payload) klass = Object.const_get(item["class"]) worker = klass.new worker.perform(*item["args"]) channel.ack(delivery_info.delivery_tag, false) rescue Cuniculus::BadlyFormattedPayload => ex handle_error(ex) # If parse failed, send message straight to DLX channel.nack(delivery_info.delivery_tag, false, false) rescue StandardError => ex handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::Error)) maybe_retry(delivery_info, item) end |
#start ⇒ Object
18 19 20 21 22 23 24 |
# File 'lib/cuniculus/consumer.rb', line 18 def start @exchange = channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) @job_queue = queue_config.declare!(channel) @_consumer = job_queue.subscribe(manual_ack: true, block: false) do |delivery_info, properties, payload| run_job(delivery_info, properties, payload) end end |
#stop ⇒ Object
26 27 28 29 |
# File 'lib/cuniculus/consumer.rb', line 26 def stop @_consumer&.cancel channel.close unless channel.closed? end |