Class: Maestro::Messenger::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/maestro_common/mq/messenger.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, default_handler = nil, opts = {}) ⇒ Queue

Minimum requirements - a name, and a handler for messages we don’t know about

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/maestro_common/mq/messenger.rb', line 31

def initialize(name, default_handler = nil, opts = {})
  raise ArgumentError, "#{name} cannot be nil" unless name
  raise ArgumentError, "#{name} must match the following regex #{VALID_QUEUE_REGEX}, '#{name}' does not" unless name.match(/#{VALID_QUEUE_REGEX}/)

  self.handlers = {}
  self.connected = false
  self.default_handler = default_handler
  self.name = name
  self.opts = opts
  self.use_eventmachine = true

  if default_handler
    register_handler(default_handler)
  else
    Maestro.log.info "No default handler for queue '#{name}'.  Unknown messages will be dropped"
  end
end

Instance Attribute Details

#connectedObject

Returns the value of attribute connected.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def connected
  @connected
end

#default_handlerObject

Returns the value of attribute default_handler.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def default_handler
  @default_handler
end

#handlersObject

Returns the value of attribute handlers.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def handlers
  @handlers
end

#messengerObject

Returns the value of attribute messenger.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def messenger
  @messenger
end

#nameObject

Returns the value of attribute name.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def name
  @name
end

#optsObject

Returns the value of attribute opts.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def opts
  @opts
end

#use_eventmachineObject

Returns the value of attribute use_eventmachine.



28
29
30
# File 'lib/maestro_common/mq/messenger.rb', line 28

def use_eventmachine
  @use_eventmachine
end

Instance Method Details

#connectObject



136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/maestro_common/mq/messenger.rb', line 136

def connect
  if messenger
    # If eventmachine enabled will use it to defer delivery so we can receive the message and continue with life
    # That can be an issue sometimes as different threads may execute at different rates, causing unintentional
    # bursts of chronometric radiation that can cause messages to appear to execute in non-sequential order.
    # (Actually they are, but since multiple may be executing in parallel it can have unintended consequences)
    # Set the 'use_eventmachine' property of the queue connection to 'false' to ensure message processing completes
    # in the order of reception.  This will effectively block incoming messages, so the consequences of that should
    # be taken into account.
    Maestro::MQHelper.subscribe(@name, opts) { |message| use_eventmachine ? EventMachine.defer { handle_incoming_message(message) } : handle_incoming_message(message) } unless connected?
    self.connected = true
  end
end

#connected?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/maestro_common/mq/messenger.rb', line 155

def connected?
  return connected
end

#disconnectObject



150
151
152
153
# File 'lib/maestro_common/mq/messenger.rb', line 150

def disconnect
  Maestro::MQHelper.unsubscribe(@name) if @name && connected?
  self.connected = false
end

#handle_incoming_message(message) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/maestro_common/mq/messenger.rb', line 71

def handle_incoming_message(message)
  # All consumers expect message data to be json and parse... maybe just parse it here and pass parsed data
  # then if we use different encoding noone will be any the wiser
  begin
    Maestro.log.debug "Received Message #{message.body}" if messenger.debug

    # Only pass messages that have not expired
    now = Time.now.to_i * 1000
    expiration = message.headers['expires'].to_i

    if (expiration > 0 && now > expiration)
      timing = "Message expired at: #{expiration} [#{Time.at expiration/1000}], Current time: #{now} [#{Time.at now/1000}]"
      Maestro.log.warn "Skipping agent queue expired message. #{timing}: #{message.body}"
      # drop message
    else
      hash = JSON.parse(message.body)
      msg_type = hash['__msg_type__'] || '-legacy-'
      hash['__msg_from__'] = name
      handler = handlers[msg_type]

      if handler && handler.respond_to?(:on_incoming_message)
        # Depending on arity, send either (self, type, content) or (self, type, content, raw)
        case handler.method(:on_incoming_message).arity
        when 3
          handler.on_incoming_message(self, msg_type, hash['__msg_content__'])
        when 4
          handler.on_incoming_message(self, msg_type, hash['__msg_content__'], hash)
        end
      else
        if default_handler && default_handler.respond_to?(:on_unhandled_message)
          default_handler.on_unhandled_message(self, msg_type, hash['__msg_content__'], hash)
        else
          Maestro.log.warn "Dumping unhandled '#{msg_type}' message (no default handler): #{hash}"
        end
      end

      Maestro.log.debug("Processed #{msg_type}: #{message.body}") if messenger.debug
    end
  rescue Exception => e
    # something happened executing processing the message/running the plugin
    backtrace = e.backtrace.join("\n")
    # set the error with the exception message and backtrace
    Maestro.log.error "Error processing message - #{e.class}:#{e}\n#{backtrace}"
  ensure
    if opts && opts[:ack] == 'client'
      Maestro::MQHelper.connection.ack(message)
    end
  end
end

#register_handler(handler, msg_types = nil) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/maestro_common/mq/messenger.rb', line 49

def register_handler(handler, msg_types = nil)
  msg_types = handler.get_handled_message_types unless msg_types

  if msg_types && !msg_types.empty?
    msg_types.each do |t|
      handlers[t] = handler
      Maestro.log.debug "Registering message type '#{t}' to handler '#{handler.class.name}' on queue #{name}"
    end
  else
    Maestro.log.debug "No message types registered for queue '#{name}' all messages will be #{default_handler ? 'sent to "on_unhandled_message"' : 'dropped'}"
  end
end

#send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object Also known as: send_message



62
63
64
# File 'lib/maestro_common/mq/messenger.rb', line 62

def send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block)
  messenger.send_message_async(name, message, options, block)
end

#send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object



67
68
69
# File 'lib/maestro_common/mq/messenger.rb', line 67

def send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block)
  messenger.send_message_sync(name, message, timeout, options, block)
end