Class: Hutch::Worker
Instance Method Summary collapse
- #acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object
- #consumers=(val) ⇒ Object
- #error_acknowledgements ⇒ Object
- #handle_error(*args) ⇒ Object
-
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
Called internally when a new messages comes in from RabbitMQ.
-
#initialize(broker, consumers, setup_procs) ⇒ Worker
constructor
A new instance of Worker.
-
#run ⇒ Object
Run the main event loop.
-
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
-
#setup_queues ⇒ Object
Set up the queues for each of the worker’s consumers.
-
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
- #with_tracing(klass) ⇒ Object
Methods included from Logging
#logger, logger, logger=, setup_logger
Constructor Details
#initialize(broker, consumers, setup_procs) ⇒ Worker
Returns a new instance of Worker.
13 14 15 16 17 |
# File 'lib/hutch/worker.rb', line 13 def initialize(broker, consumers, setup_procs) @broker = broker self.consumers = consumers self.setup_procs = setup_procs end |
Instance Method Details
#acknowledge_error(delivery_info, properties, broker, ex) ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/hutch/worker.rb', line 90 def acknowledge_error(delivery_info, properties, broker, ex) acks = error_acknowledgements + [Hutch::Acknowledgements::NackOnAllFailures.new] acks.find do |backend| backend.handle(delivery_info, properties, broker, ex) end end |
#consumers=(val) ⇒ Object
98 99 100 101 102 103 |
# File 'lib/hutch/worker.rb', line 98 def consumers=(val) if val.empty? logger.warn "no consumer loaded, ensure there's no configuration issue" end @consumers = val end |
#error_acknowledgements ⇒ Object
105 106 107 |
# File 'lib/hutch/worker.rb', line 105 def error_acknowledgements Hutch::Config[:error_acknowledgements] end |
#handle_error(*args) ⇒ Object
84 85 86 87 88 |
# File 'lib/hutch/worker.rb', line 84 def handle_error(*args) Hutch::Config[:error_handlers].each do |backend| backend.handle(*args) end end |
#handle_message(consumer, delivery_info, properties, payload) ⇒ Object
Called internally when a new messages comes in from RabbitMQ. Responsible for wrapping up the message and passing it to the consumer.
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/hutch/worker.rb', line 61 def (consumer, delivery_info, properties, payload) serializer = consumer.get_serializer || Hutch::Config[:serializer] logger.debug { spec = serializer.binary? ? "#{payload.bytesize} bytes" : "#{payload}" "message(#{properties. || '-'}): " + "routing key: #{delivery_info.routing_key}, " + "consumer: #{consumer}, " + "payload: #{spec}" } = Message.new(delivery_info, properties, payload, serializer) consumer_instance = consumer.new.tap { |c| c.broker, c.delivery_info = @broker, delivery_info } with_tracing(consumer_instance).handle() @broker.ack(delivery_info.delivery_tag) unless consumer_instance. rescue => ex acknowledge_error(delivery_info, properties, @broker, ex) handle_error(properties, payload, consumer, ex) end |
#run ⇒ Object
Run the main event loop. The consumers will be set up with queues, and process the messages in their respective queues indefinitely. This method never returns.
22 23 24 25 26 27 28 29 |
# File 'lib/hutch/worker.rb', line 22 def run setup_queues setup_procs.each(&:call) Waiter.wait_until_signaled stop end |
#setup_queue(consumer) ⇒ Object
Bind a consumer’s routing keys to its queue, and set up a subscription to receive messages sent to the queue.
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/hutch/worker.rb', line 47 def setup_queue(consumer) logger.info "setting up queue: #{consumer.get_queue_name}" queue = @broker.queue(consumer.get_queue_name, consumer.) @broker.bind_queue(queue, consumer.routing_keys) queue.subscribe(consumer_tag: unique_consumer_tag, manual_ack: true) do |*args| delivery_info, properties, payload = Hutch::Adapter.(*args) (consumer, delivery_info, properties, payload) end end |
#setup_queues ⇒ Object
Set up the queues for each of the worker’s consumers.
37 38 39 40 41 42 43 |
# File 'lib/hutch/worker.rb', line 37 def setup_queues logger.info 'setting up queues' vetted = @consumers.reject { |c| group_configured? && group_restricted?(c) } vetted.each do |c| setup_queue(c) end end |
#stop ⇒ Object
Stop a running worker by killing all subscriber threads.
32 33 34 |
# File 'lib/hutch/worker.rb', line 32 def stop @broker.stop end |