Class: Maestro::Messenger::Queue
- Inherits:
-
Object
- Object
- Maestro::Messenger::Queue
- Defined in:
- lib/maestro_common/mq/messenger.rb
Instance Attribute Summary collapse
-
#connected ⇒ Object
Returns the value of attribute connected.
-
#default_handler ⇒ Object
Returns the value of attribute default_handler.
-
#handlers ⇒ Object
Returns the value of attribute handlers.
-
#messenger ⇒ Object
Returns the value of attribute messenger.
-
#name ⇒ Object
Returns the value of attribute name.
-
#opts ⇒ Object
Returns the value of attribute opts.
-
#use_eventmachine ⇒ Object
Returns the value of attribute use_eventmachine.
Instance Method Summary collapse
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
- #handle_incoming_message(message) ⇒ Object
-
#initialize(name, default_handler = nil, opts = {}) ⇒ Queue
constructor
Minimum requirements - a name, and a handler for messages we don’t know about.
- #register_handler(handler, msg_types = nil) ⇒ Object
- #send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object (also: #send_message)
- #send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object
Constructor Details
#initialize(name, default_handler = nil, opts = {}) ⇒ Queue
Minimum requirements - a name, and a handler for messages we don’t know about
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/maestro_common/mq/messenger.rb', line 31 def initialize(name, default_handler = nil, opts = {}) raise ArgumentError, "#{name} cannot be nil" unless name raise ArgumentError, "#{name} must match the following regex #{VALID_QUEUE_REGEX}, '#{name}' does not" unless name.match(/#{VALID_QUEUE_REGEX}/) self.handlers = {} self.connected = false self.default_handler = default_handler self.name = name self.opts = opts self.use_eventmachine = true if default_handler register_handler(default_handler) else Maestro.log.info "No default handler for queue '#{name}'. Unknown messages will be dropped" end end |
Instance Attribute Details
#connected ⇒ Object
Returns the value of attribute connected.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def connected @connected end |
#default_handler ⇒ Object
Returns the value of attribute default_handler.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def default_handler @default_handler end |
#handlers ⇒ Object
Returns the value of attribute handlers.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def handlers @handlers end |
#messenger ⇒ Object
Returns the value of attribute messenger.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def messenger @messenger end |
#name ⇒ Object
Returns the value of attribute name.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def name @name end |
#opts ⇒ Object
Returns the value of attribute opts.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def opts @opts end |
#use_eventmachine ⇒ Object
Returns the value of attribute use_eventmachine.
28 29 30 |
# File 'lib/maestro_common/mq/messenger.rb', line 28 def use_eventmachine @use_eventmachine end |
Instance Method Details
#connect ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/maestro_common/mq/messenger.rb', line 136 def connect if messenger # If eventmachine enabled will use it to defer delivery so we can receive the message and continue with life # That can be an issue sometimes as different threads may execute at different rates, causing unintentional # bursts of chronometric radiation that can cause messages to appear to execute in non-sequential order. # (Actually they are, but since multiple may be executing in parallel it can have unintended consequences) # Set the 'use_eventmachine' property of the queue connection to 'false' to ensure message processing completes # in the order of reception. This will effectively block incoming messages, so the consequences of that should # be taken into account. Maestro::MQHelper.subscribe(@name, opts) { || use_eventmachine ? EventMachine.defer { () } : () } unless connected? self.connected = true end end |
#connected? ⇒ Boolean
155 156 157 |
# File 'lib/maestro_common/mq/messenger.rb', line 155 def connected? return connected end |
#disconnect ⇒ Object
150 151 152 153 |
# File 'lib/maestro_common/mq/messenger.rb', line 150 def disconnect Maestro::MQHelper.unsubscribe(@name) if @name && connected? self.connected = false end |
#handle_incoming_message(message) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/maestro_common/mq/messenger.rb', line 71 def () # All consumers expect message data to be json and parse... maybe just parse it here and pass parsed data # then if we use different encoding noone will be any the wiser begin Maestro.log.debug "Received Message #{message.body}" if messenger.debug # Only pass messages that have not expired now = Time.now.to_i * 1000 expiration = .headers['expires'].to_i if (expiration > 0 && now > expiration) timing = "Message expired at: #{expiration} [#{Time.at expiration/1000}], Current time: #{now} [#{Time.at now/1000}]" Maestro.log.warn "Skipping agent queue expired message. #{timing}: #{message.body}" # drop message else hash = JSON.parse(.body) msg_type = hash['__msg_type__'] || '-legacy-' hash['__msg_from__'] = name handler = handlers[msg_type] if handler && handler.respond_to?(:on_incoming_message) # Depending on arity, send either (self, type, content) or (self, type, content, raw) case handler.method(:on_incoming_message).arity when 3 handler.(self, msg_type, hash['__msg_content__']) when 4 handler.(self, msg_type, hash['__msg_content__'], hash) end else if default_handler && default_handler.respond_to?(:on_unhandled_message) default_handler.(self, msg_type, hash['__msg_content__'], hash) else Maestro.log.warn "Dumping unhandled '#{msg_type}' message (no default handler): #{hash}" end end Maestro.log.debug("Processed #{msg_type}: #{message.body}") if messenger.debug end rescue Exception => e # something happened executing processing the message/running the plugin backtrace = e.backtrace.join("\n") # set the error with the exception message and backtrace Maestro.log.error "Error processing message - #{e.class}:#{e}\n#{backtrace}" ensure if opts && opts[:ack] == 'client' Maestro::MQHelper.connection.ack() end end end |
#register_handler(handler, msg_types = nil) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/maestro_common/mq/messenger.rb', line 49 def register_handler(handler, msg_types = nil) msg_types = handler. unless msg_types if msg_types && !msg_types.empty? msg_types.each do |t| handlers[t] = handler Maestro.log.debug "Registering message type '#{t}' to handler '#{handler.class.name}' on queue #{name}" end else Maestro.log.debug "No message types registered for queue '#{name}' all messages will be #{default_handler ? 'sent to "on_unhandled_message"' : 'dropped'}" end end |
#send_message_async(message, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object Also known as: send_message
62 63 64 |
# File 'lib/maestro_common/mq/messenger.rb', line 62 def (, = DEFAULT_SEND_OPTIONS, &block) messenger.(name, , , block) end |
#send_message_sync(message, timeout = DEFAULT_SYNC_TIMEOUT, options = DEFAULT_SEND_OPTIONS, &block) ⇒ Object
67 68 69 |
# File 'lib/maestro_common/mq/messenger.rb', line 67 def (, timeout = DEFAULT_SYNC_TIMEOUT, = DEFAULT_SEND_OPTIONS, &block) messenger.(name, , timeout, , block) end |