Class: Messagebus::Swarm::Drone

Inherits:
Object
  • Object
show all
Defined in:
lib/messagebus/swarm/drone.rb,
lib/messagebus/swarm/drone/logging_worker.rb

Overview

This is a composition of a consumer and a separate message processor. It allows you to use Plain-Old-Ruby-Objects to do message processing. See #initialize for how the messages are delegated.

Defined Under Namespace

Classes: AbortProcessing, LoggingWorker

Constant Summary collapse

INITIALIZING =
"initializing"
RECEIVING =
"receiving"
PROCESSING =
"processing"
COMPLETED =
"completed"
STOP_PROCESSING_MESSAGE =
"stop processing message"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Drone

Expected options:

  • :ack_on_error (default false): whether to ack the message when an error was raised. Aliased to auto_acknowledge for backwards compatibility

  • :consumer (required): a consumer object for that topic

  • :destination_name (required): the message bus queue/topic name

  • :id: an id for this drone (just used for debugging)

  • :logger (required): the logger to publish messages to

  • :worker_class (required): the actual worker that will be used to do the processing

As messages come down, they will be passed to the worker’s perform or perform_on_destination method. The methods will be called in the following priority (if they exist):

  • perform_on_destination(message_payload, destination_message_came_from)

  • perform(message_payload)

A message is processed by:

message = receive_message
begin
  worker.perform(message)
  ack(message)
rescue AbortProcessing
  # raise this error if you want to say "Don't ack me"
rescue StandardError
  ack(message) if ack_on_error
end


78
79
80
81
82
83
# File 'lib/messagebus/swarm/drone.rb', line 78

def initialize(options)
  @auto_acknowledge = options.fetch(:ack_on_error, options[:auto_acknowledge])
  @consumer, @destination_name, @worker_class, @id, @logger = options.values_at(:consumer, :destination_name, :worker_class, :id, :logger)
  @state = INITIALIZING
  @logger.debug { "initializing a drone drone_id=#{@id}" }
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



51
52
53
# File 'lib/messagebus/swarm/drone.rb', line 51

def id
  @id
end

#stateObject (readonly)

Returns the value of attribute state.



51
52
53
# File 'lib/messagebus/swarm/drone.rb', line 51

def state
  @state
end

Instance Method Details

#processing_loopObject

This is the main loop for the drone’s work. This will not return until the drone is stopped via #stop.

If the consumer hasn’t been started yet, this method will start it. It also will auto close the consumer in that case.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/messagebus/swarm/drone.rb', line 91

def processing_loop
  @processing = true

  auto_started_consumer = false
  begin
    if !@consumer.started?
      @logger.debug "auto starting the consumer drone_id=#{@id}"
      @consumer.start
      auto_started_consumer = true
    end

    while @processing
      message = nil
      begin
        @logger.debug "waiting for a message"

        @state = RECEIVING
        message = @consumer.receive
        # intentional === here, this is used as a signal, so we can use object equality
        # to check if we received the signal
        if message === STOP_PROCESSING_MESSAGE
          @logger.info "received a stop message, exiting drone_id=#{@id}, message=#{message.inspect}"
          @state = COMPLETED
          next
        end

        @logger.info "received message drone_id=#{@id}, message_id=#{message.message_id}"

        @state = PROCESSING
        @logger.info "processing message drone_id=#{@id}, message_id=#{message.message_id}, worker=#{@worker_class.name}"

        raw_message = extract_underlying_message_body(message)
        @logger.debug { "drone_id=#{@id} message_id=#{message.message_id}, message=#{raw_message.inspect}" }

        worker_perform(@logger, @destination_name, @worker_class, @consumer, @auto_acknowledge, message, raw_message)

        @state = COMPLETED
      rescue => except
        @logger.warn "exception processing message drone_id=#{@id}, message_id=#{message && message.message_id}, exception=\"#{except.message}\", exception_backtrace=#{except.backtrace.join("|")}"
      end
    end
  ensure
    if auto_started_consumer
      @logger.debug "auto stopping the consumer drone_id=#{@id}"
      @consumer.stop
    end
  end

  @logger.info("gracefully exited drone_id=#{@id}")
end

#stopObject

Stops this drone from processing any additional jobs. This will not abort any in progress jobs.



145
146
147
148
149
150
151
# File 'lib/messagebus/swarm/drone.rb', line 145

def stop
  @logger.info("received stop message, current_state=#{@state}, processing=#{@processing}, drone_id=#{@id}")
  return if !@processing

  @processing = false
  @consumer.insert_sentinel_value(STOP_PROCESSING_MESSAGE)
end