Class: Cuniculus::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/cuniculus/consumer.rb

Constant Summary collapse

POLL_TIME =
5
JOB_REQUIRED_KEYS =
%w[class args].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_config, channel) ⇒ Consumer

Returns a new instance of Consumer.



13
14
15
16
# File 'lib/cuniculus/consumer.rb', line 13

def initialize(queue_config, channel)
  @channel = channel
  @queue_config = queue_config
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



11
12
13
# File 'lib/cuniculus/consumer.rb', line 11

def channel
  @channel
end

#exchangeObject (readonly)

Returns the value of attribute exchange.



11
12
13
# File 'lib/cuniculus/consumer.rb', line 11

def exchange
  @exchange
end

#job_queueObject (readonly)

Returns the value of attribute job_queue.



11
12
13
# File 'lib/cuniculus/consumer.rb', line 11

def job_queue
  @job_queue
end

#queue_configObject (readonly)

Returns the value of attribute queue_config.



11
12
13
# File 'lib/cuniculus/consumer.rb', line 11

def queue_config
  @queue_config
end

Instance Method Details

#constantize(str) ⇒ Object



80
81
82
83
84
85
86
87
88
89
# File 'lib/cuniculus/consumer.rb', line 80

def constantize(str)
  return Object.const_get(str) unless str.include?("::")

  names = str.split("::")
  names.shift if names.empty? || names.first.empty?

  names.inject(Object) do |constant, name|
    constant.const_get(name, false)
  end
end

#handle_error(e) ⇒ Object



75
76
77
78
# File 'lib/cuniculus/consumer.rb', line 75

def handle_error(e)
  Cuniculus.logger.error("#{e.class.name}: #{e.message}")
  Cuniculus.logger.error(e.backtrace.join("\n")) unless e.backtrace.nil?
end

#maybe_retry(delivery_info, item) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/cuniculus/consumer.rb', line 57

def maybe_retry(delivery_info, item)
  retry_count = item["_cun_retries"].to_i
  retry_queue_name = job_queue.retry_queue(retry_count)
  unless retry_queue_name
    channel.nack(delivery_info.delivery_tag, false, false)
    return
  end
  payload = Cuniculus.dump_job(item.merge("_cun_retries" => retry_count + 1))
  exchange.publish(
    payload,
    {
      routing_key: retry_queue_name,
      persistent: true
    }
  )
  channel.ack(delivery_info.delivery_tag, false)
end

#parse_job(payload) ⇒ Object



46
47
48
49
50
51
52
53
54
55
# File 'lib/cuniculus/consumer.rb', line 46

def parse_job(payload)
  msg = Cuniculus.load_job(payload)
  raise Cuniculus::BadlyFormattedPayload, "Consumed message with missing information: #{payload}\nIt should have keys [#{JOB_REQUIRED_KEYS.join(', ')}]" unless (JOB_REQUIRED_KEYS - msg.keys).empty?

  msg
rescue Cuniculus::BadlyFormattedPayload
  raise
rescue StandardError => ex
  raise Cuniculus.convert_exception_class(ex, Cuniculus::BadlyFormattedPayload), "Badly formatted consumed message: #{payload}"
end

#run_job(delivery_info, _properties, payload) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/cuniculus/consumer.rb', line 31

def run_job(delivery_info, _properties, payload)
  item = parse_job(payload)
  klass = Object.const_get(item["class"])
  worker = klass.new
  worker.perform(*item["args"])
  channel.ack(delivery_info.delivery_tag, false)
rescue Cuniculus::BadlyFormattedPayload => ex
  handle_error(ex)
  # If parse failed, send message straight to DLX
  channel.nack(delivery_info.delivery_tag, false, false)
rescue StandardError => ex
  handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::Error))
  maybe_retry(delivery_info, item)
end

#startObject



18
19
20
21
22
23
24
# File 'lib/cuniculus/consumer.rb', line 18

def start
  @exchange = channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true })
  @job_queue = queue_config.declare!(channel)
  @_consumer = job_queue.subscribe(manual_ack: true, block: false) do |delivery_info, properties, payload|
    run_job(delivery_info, properties, payload)
  end
end

#stopObject



26
27
28
29
# File 'lib/cuniculus/consumer.rb', line 26

def stop
  @_consumer&.cancel
  channel.close unless channel.closed?
end