Class: Messagebus::Swarm::Drone
- Inherits:
-
Object
- Object
- Messagebus::Swarm::Drone
- Defined in:
- lib/messagebus/swarm/drone.rb,
lib/messagebus/swarm/drone/logging_worker.rb
Overview
This is a composition of a consumer and a separate message processor. It allows you to use Plain-Old-Ruby-Objects to do message processing. See #initialize for how the messages are delegated.
Defined Under Namespace
Classes: AbortProcessing, LoggingWorker
Constant Summary collapse
- INITIALIZING =
"initializing"
- RECEIVING =
"receiving"
- PROCESSING =
"processing"
- COMPLETED =
"completed"
- STOP_PROCESSING_MESSAGE =
"stop processing message"
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Instance Method Summary collapse
-
#initialize(options) ⇒ Drone
constructor
Expected options: * :ack_on_error (default false): whether to ack the message when an error was raised.
-
#processing_loop ⇒ Object
This is the main loop for the drone’s work.
-
#stop ⇒ Object
Stops this drone from processing any additional jobs.
Constructor Details
#initialize(options) ⇒ Drone
Expected options:
-
:ack_on_error (default false): whether to ack the message when an error was raised. Aliased to auto_acknowledge for backwards compatibility
-
:consumer (required): a consumer object for that topic
-
:destination_name (required): the message bus queue/topic name
-
:id: an id for this drone (just used for debugging)
-
:logger (required): the logger to publish messages to
-
:worker_class (required): the actual worker that will be used to do the processing
As messages come down, they will be passed to the worker’s perform or perform_on_destination method. The methods will be called in the following priority (if they exist):
-
perform_on_destination(message_payload, destination_message_came_from)
-
perform(message_payload)
A message is processed by:
=
begin
worker.perform()
ack()
rescue AbortProcessing
# raise this error if you want to say "Don't ack me"
rescue StandardError
ack() if ack_on_error
end
78 79 80 81 82 83 |
# File 'lib/messagebus/swarm/drone.rb', line 78 def initialize() @auto_acknowledge = .fetch(:ack_on_error, [:auto_acknowledge]) @consumer, @destination_name, @worker_class, @id, @logger = .values_at(:consumer, :destination_name, :worker_class, :id, :logger) @state = INITIALIZING @logger.debug { "initializing a drone drone_id=#{@id}" } end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
51 52 53 |
# File 'lib/messagebus/swarm/drone.rb', line 51 def id @id end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
51 52 53 |
# File 'lib/messagebus/swarm/drone.rb', line 51 def state @state end |
Instance Method Details
#processing_loop ⇒ Object
This is the main loop for the drone’s work. This will not return until the drone is stopped via #stop.
If the consumer hasn’t been started yet, this method will start it. It also will auto close the consumer in that case.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/messagebus/swarm/drone.rb', line 91 def processing_loop @processing = true auto_started_consumer = false begin if !@consumer.started? @logger.debug "auto starting the consumer drone_id=#{@id}" @consumer.start auto_started_consumer = true end while @processing = nil begin @logger.debug "waiting for a message" @state = RECEIVING = @consumer.receive # intentional === here, this is used as a signal, so we can use object equality # to check if we received the signal if === STOP_PROCESSING_MESSAGE @logger.info "received a stop message, exiting drone_id=#{@id}, message=#{.inspect}" @state = COMPLETED next end @logger.info "received message drone_id=#{@id}, message_id=#{.}" @state = PROCESSING @logger.info "processing message drone_id=#{@id}, message_id=#{.}, worker=#{@worker_class.name}" = () @logger.debug { "drone_id=#{@id} message_id=#{.}, message=#{.inspect}" } worker_perform(@logger, @destination_name, @worker_class, @consumer, @auto_acknowledge, , ) @state = COMPLETED rescue => except @logger.warn "exception processing message drone_id=#{@id}, message_id=#{ && .}, exception=\"#{except.}\", exception_backtrace=#{except.backtrace.join("|")}" end end ensure if auto_started_consumer @logger.debug "auto stopping the consumer drone_id=#{@id}" @consumer.stop end end @logger.info("gracefully exited drone_id=#{@id}") end |
#stop ⇒ Object
Stops this drone from processing any additional jobs. This will not abort any in progress jobs.
145 146 147 148 149 150 151 |
# File 'lib/messagebus/swarm/drone.rb', line 145 def stop @logger.info("received stop message, current_state=#{@state}, processing=#{@processing}, drone_id=#{@id}") return if !@processing @processing = false @consumer.insert_sentinel_value(STOP_PROCESSING_MESSAGE) end |