Class: Beetle::Message
- Inherits:
-
Object
- Object
- Beetle::Message
- Includes:
- Logging
- Defined in:
- lib/beetle/message.rb
Overview
Instances of class Message are created when a subscription callback fires. Class Message contains the code responsible for message deduplication and determining if it should retry executing the message handler after a handler has crashed (or forcefully aborted).
Constant Summary collapse
- FORMAT_VERSION =
current message format version
1
- FLAG_REDUNDANT =
flag for encoding redundant messages
1
- DEFAULT_TTL =
default lifetime of messages
1.day
- DEFAULT_HANDLER_TIMEOUT =
forcefully abort a running handler after this many seconds. can be overriden when registering a handler.
600.seconds
- TIMEOUT_GRACE_PERIOD =
How much extra time on top of the handler timeout we add before considering a handler timed out
10.seconds
- DEFAULT_HANDLER_EXECUTION_ATTEMPTS =
how many times we should try to run a handler before giving up
1
- DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY =
how many seconds we should wait before retrying handler execution
10.seconds
- DEFAULT_EXCEPTION_LIMIT =
how many exceptions should be tolerated before giving up
0
Instance Attribute Summary collapse
-
#attempts_limit ⇒ Object
readonly
how many times we should try to run the handler.
-
#data ⇒ Object
readonly
message payload.
-
#delay ⇒ Object
readonly
how long to wait before retrying the message handler.
-
#exception ⇒ Object
readonly
exception raised by handler execution.
-
#exceptions_limit ⇒ Object
readonly
how many exceptions we should tolerate before giving up.
-
#expires_at ⇒ Object
readonly
unix timestamp after which the message should be considered stale.
-
#flags ⇒ Object
readonly
flags sent with the message.
-
#format_version ⇒ Object
readonly
the message format version of the message.
-
#handler_result ⇒ Object
readonly
value returned by handler execution.
-
#header ⇒ Object
readonly
the AMQP header received with the message.
-
#max_delay ⇒ Object
readonly
maximum wait time for message handler retries (uses exponential backoff).
-
#queue ⇒ Object
readonly
name of the queue on which the message was received.
-
#retry_on ⇒ Object
readonly
array of exceptions accepted to be rescued and retried.
-
#server ⇒ Object
readonly
server from which the message was received.
-
#timeout ⇒ Object
readonly
how many seconds the handler is allowed to execute.
-
#timestamp ⇒ Object
readonly
unix timestamp when the message was published.
-
#uuid ⇒ Object
readonly
the uuid of the message.
Class Method Summary collapse
-
.generate_uuid ⇒ Object
generate uuid for publishing.
-
.now ⇒ Object
current time (UNIX timestamp).
-
.publishing_options(opts = {}) ⇒ Object
build hash with options for the publisher.
Instance Method Summary collapse
-
#aquire_mutex! ⇒ Object
aquire execution mutex before we run the handler (and delete it if we can’t aquire it).
-
#attempts ⇒ Object
how many times we already tried running the handler.
-
#attempts_limit_reached?(attempts = nil) ⇒ Boolean
whether we have already tried running the handler as often as specified when the handler was registered.
-
#completed! ⇒ Object
mark message handling complete in the deduplication store.
-
#completed? ⇒ Boolean
message handling completed?.
-
#decode ⇒ Object
extracts various values from the AMQP header properties.
-
#delayed?(t = nil) ⇒ Boolean
whether we should wait before running the handler.
-
#delete_mutex! ⇒ Object
delete execution mutex.
- #exception_accepted? ⇒ Boolean
-
#exceptions_limit_reached?(exceptions = nil) ⇒ Boolean
whether the number of exceptions has exceeded the limit set when the handler was registered.
-
#expired? ⇒ Boolean
a message has expired if the header expiration timestamp is smaller than the current time.
- #fetch_status_delay_timeout_attempts_exceptions ⇒ Object
-
#increment_exception_count! ⇒ Object
increment number of exception occurences in the deduplication store.
-
#increment_execution_attempts! ⇒ Object
record the fact that we are trying to run the handler.
-
#initialize(queue, header, body, opts = {}) ⇒ Message
constructor
A new instance of Message.
-
#key_exists? ⇒ Boolean
have we already seen this message? if not, set the status to “incomplete” and store the message exipration timestamp in the deduplication store.
-
#msg_id ⇒ Object
unique message id.
-
#now ⇒ Object
current time (UNIX timestamp).
-
#process(handler) ⇒ Object
process this message and do not allow any exception to escape to the caller.
-
#redundant? ⇒ Boolean
whether the publisher has tried sending this message to two servers.
-
#routing_key ⇒ Object
(also: #key)
the routing key.
-
#set_delay! ⇒ Object
store delay value in the deduplication store.
-
#set_timeout! ⇒ Object
store handler timeout timestamp in the deduplication store.
-
#setup(opts) ⇒ Object
:nodoc:.
-
#simple? ⇒ Boolean
whether this is a message we can process without accessing the deduplication store.
-
#timed_out! ⇒ Object
reset handler timeout in the deduplication store.
-
#timed_out?(t = nil) ⇒ Boolean
handler timed out?.
Methods included from Logging
Constructor Details
#initialize(queue, header, body, opts = {}) ⇒ Message
Returns a new instance of Message.
65 66 67 68 69 70 71 |
# File 'lib/beetle/message.rb', line 65 def initialize(queue, header, body, opts = {}) @queue = queue @header = header @data = body setup(opts) decode end |
Instance Attribute Details
#attempts_limit ⇒ Object (readonly)
how many times we should try to run the handler
55 56 57 |
# File 'lib/beetle/message.rb', line 55 def attempts_limit @attempts_limit end |
#data ⇒ Object (readonly)
message payload
41 42 43 |
# File 'lib/beetle/message.rb', line 41 def data @data end |
#delay ⇒ Object (readonly)
how long to wait before retrying the message handler
51 52 53 |
# File 'lib/beetle/message.rb', line 51 def delay @delay end |
#exception ⇒ Object (readonly)
exception raised by handler execution
61 62 63 |
# File 'lib/beetle/message.rb', line 61 def exception @exception end |
#exceptions_limit ⇒ Object (readonly)
how many exceptions we should tolerate before giving up
57 58 59 |
# File 'lib/beetle/message.rb', line 57 def exceptions_limit @exceptions_limit end |
#expires_at ⇒ Object (readonly)
unix timestamp after which the message should be considered stale
47 48 49 |
# File 'lib/beetle/message.rb', line 47 def expires_at @expires_at end |
#flags ⇒ Object (readonly)
flags sent with the message
45 46 47 |
# File 'lib/beetle/message.rb', line 45 def flags @flags end |
#format_version ⇒ Object (readonly)
the message format version of the message
43 44 45 |
# File 'lib/beetle/message.rb', line 43 def format_version @format_version end |
#handler_result ⇒ Object (readonly)
value returned by handler execution
63 64 65 |
# File 'lib/beetle/message.rb', line 63 def handler_result @handler_result end |
#header ⇒ Object (readonly)
the AMQP header received with the message
35 36 37 |
# File 'lib/beetle/message.rb', line 35 def header @header end |
#max_delay ⇒ Object (readonly)
maximum wait time for message handler retries (uses exponential backoff)
53 54 55 |
# File 'lib/beetle/message.rb', line 53 def max_delay @max_delay end |
#queue ⇒ Object (readonly)
name of the queue on which the message was received
33 34 35 |
# File 'lib/beetle/message.rb', line 33 def queue @queue end |
#retry_on ⇒ Object (readonly)
array of exceptions accepted to be rescued and retried
59 60 61 |
# File 'lib/beetle/message.rb', line 59 def retry_on @retry_on end |
#server ⇒ Object (readonly)
server from which the message was received
31 32 33 |
# File 'lib/beetle/message.rb', line 31 def server @server end |
#timeout ⇒ Object (readonly)
how many seconds the handler is allowed to execute
49 50 51 |
# File 'lib/beetle/message.rb', line 49 def timeout @timeout end |
#timestamp ⇒ Object (readonly)
unix timestamp when the message was published
39 40 41 |
# File 'lib/beetle/message.rb', line 39 def @timestamp end |
#uuid ⇒ Object (readonly)
the uuid of the message
37 38 39 |
# File 'lib/beetle/message.rb', line 37 def uuid @uuid end |
Class Method Details
.generate_uuid ⇒ Object
generate uuid for publishing
152 153 154 |
# File 'lib/beetle/message.rb', line 152 def self.generate_uuid SecureRandom.uuid end |
.now ⇒ Object
current time (UNIX timestamp)
142 143 144 |
# File 'lib/beetle/message.rb', line 142 def self.now #:nodoc: Time.now.to_i end |
.publishing_options(opts = {}) ⇒ Object
build hash with options for the publisher
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/beetle/message.rb', line 101 def self.(opts = {}) #:nodoc: flags = 0 flags |= FLAG_REDUNDANT if opts[:redundant] expires_at = now + (opts[:ttl] || DEFAULT_TTL).to_i opts = opts.slice(*PUBLISHING_KEYS) opts[:message_id] = generate_uuid.to_s opts[:timestamp] = now headers = {} headers.merge!(opts[:headers]) if opts[:headers] headers.reject! {|k,v| v.nil? } headers.each {|k,v| headers[k] = v.to_s if v.is_a?(Symbol) } headers.merge!( :format_version => FORMAT_VERSION.to_s, :flags => flags.to_s, :expires_at => expires_at.to_s ) opts[:headers] = headers opts end |
Instance Method Details
#aquire_mutex! ⇒ Object
aquire execution mutex before we run the handler (and delete it if we can’t aquire it).
241 242 243 244 245 246 247 248 |
# File 'lib/beetle/message.rb', line 241 def aquire_mutex! if mutex = @store.setnx(msg_id, :mutex, now) logger.debug "Beetle: aquired mutex: #{msg_id}" else delete_mutex! end mutex end |
#attempts ⇒ Object
how many times we already tried running the handler
202 203 204 |
# File 'lib/beetle/message.rb', line 202 def attempts @store.get(msg_id, :attempts).to_i end |
#attempts_limit_reached?(attempts = nil) ⇒ Boolean
whether we have already tried running the handler as often as specified when the handler was registered
212 213 214 |
# File 'lib/beetle/message.rb', line 212 def attempts_limit_reached?(attempts = nil) (attempts ||= @store.get(msg_id, :attempts)) && attempts.to_i >= attempts_limit end |
#completed! ⇒ Object
mark message handling complete in the deduplication store
187 188 189 |
# File 'lib/beetle/message.rb', line 187 def completed! @store.mset(msg_id, :status => "completed", :timeout => 0) end |
#completed? ⇒ Boolean
message handling completed?
182 183 184 |
# File 'lib/beetle/message.rb', line 182 def completed? @store.get(msg_id, :status) == "completed" end |
#decode ⇒ Object
extracts various values from the AMQP header properties
87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/beetle/message.rb', line 87 def decode #:nodoc: amqp_headers = header.attributes @uuid = amqp_headers[:message_id] @timestamp = amqp_headers[:timestamp] headers = amqp_headers[:headers].symbolize_keys @format_version = headers[:format_version].to_i @flags = headers[:flags].to_i @expires_at = headers[:expires_at].to_i rescue Exception => @exception Beetle::reraise_expectation_errors! logger.error "Could not decode message. #{self.inspect}" end |
#delayed?(t = nil) ⇒ Boolean
whether we should wait before running the handler
192 193 194 |
# File 'lib/beetle/message.rb', line 192 def delayed?(t = nil) (t ||= @store.get(msg_id, :delay)) && t.to_i > now end |
#delete_mutex! ⇒ Object
delete execution mutex
251 252 253 254 |
# File 'lib/beetle/message.rb', line 251 def delete_mutex! @store.del(msg_id, :mutex) logger.debug "Beetle: deleted mutex: #{msg_id}" end |
#exception_accepted? ⇒ Boolean
226 227 228 |
# File 'lib/beetle/message.rb', line 226 def exception_accepted? @exception.nil? || retry_on.nil? || retry_on.any?{ |klass| @exception.is_a? klass} end |
#exceptions_limit_reached?(exceptions = nil) ⇒ Boolean
whether the number of exceptions has exceeded the limit set when the handler was registered
222 223 224 |
# File 'lib/beetle/message.rb', line 222 def exceptions_limit_reached?(exceptions = nil) (exceptions ||= @store.get(msg_id, :exceptions)) && exceptions.to_i > exceptions_limit end |
#expired? ⇒ Boolean
a message has expired if the header expiration timestamp is smaller than the current time
147 148 149 |
# File 'lib/beetle/message.rb', line 147 def expired? @expires_at < now end |
#fetch_status_delay_timeout_attempts_exceptions ⇒ Object
256 257 258 |
# File 'lib/beetle/message.rb', line 256 def fetch_status_delay_timeout_attempts_exceptions @store.mget(msg_id, [:status, :delay, :timeout, :attempts, :exceptions]) end |
#increment_exception_count! ⇒ Object
increment number of exception occurences in the deduplication store
217 218 219 |
# File 'lib/beetle/message.rb', line 217 def increment_exception_count! @store.incr(msg_id, :exceptions) end |
#increment_execution_attempts! ⇒ Object
record the fact that we are trying to run the handler
207 208 209 |
# File 'lib/beetle/message.rb', line 207 def increment_execution_attempts! @store.incr(msg_id, :attempts) end |
#key_exists? ⇒ Boolean
have we already seen this message? if not, set the status to “incomplete” and store the message exipration timestamp in the deduplication store.
232 233 234 235 236 237 238 |
# File 'lib/beetle/message.rb', line 232 def key_exists? = !@store.msetnx(msg_id, :status =>"incomplete", :expires => @expires_at.to_i, :timeout => (now + timeout).to_i) if logger.debug "Beetle: received duplicate message: #{msg_id} on queue: #{@queue}" end end |
#msg_id ⇒ Object
unique message id. used to form various keys in the deduplication store.
132 133 134 |
# File 'lib/beetle/message.rb', line 132 def msg_id @msg_id ||= "msgid:#{queue}:#{uuid}" end |
#now ⇒ Object
current time (UNIX timestamp)
137 138 139 |
# File 'lib/beetle/message.rb', line 137 def now #:nodoc: Time.now.to_i end |
#process(handler) ⇒ Object
process this message and do not allow any exception to escape to the caller
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/beetle/message.rb', line 261 def process(handler) result = nil begin # pre_process might set up log routing and it might raise handler.pre_process(self) rescue Exception => @pre_exception Beetle::reraise_expectation_errors! logger.error "Beetle: preprocessing error #{@pre_exception.class}(#{@pre_exception}) for #{msg_id}" end logger.debug "Beetle: processing message #{msg_id}(#{}) redelivered: #{header.redelivered?}" begin result = process_internal(handler) handler.process_exception(@exception || @pre_exception) if (@exception || @pre_exception) handler.process_failure(result) if result.failure? rescue Exception => e Beetle::reraise_expectation_errors! logger.warn "Beetle: exception '#{e}' during processing of message #{msg_id}" logger.warn "Beetle: backtrace: #{e.backtrace.join("\n")}" result = RC::InternalError end result end |
#redundant? ⇒ Boolean
whether the publisher has tried sending this message to two servers
157 158 159 |
# File 'lib/beetle/message.rb', line 157 def redundant? @flags & FLAG_REDUNDANT == FLAG_REDUNDANT end |
#routing_key ⇒ Object Also known as: key
the routing key
122 123 124 125 126 127 128 |
# File 'lib/beetle/message.rb', line 122 def routing_key @routing_key ||= if x_death = header.attributes[:headers]["x-death"] x_death.last["routing-keys"].first else header.routing_key end end |
#set_delay! ⇒ Object
store delay value in the deduplication store
197 198 199 |
# File 'lib/beetle/message.rb', line 197 def set_delay! @store.set(msg_id, :delay, now + next_delay(attempts)) end |
#set_timeout! ⇒ Object
store handler timeout timestamp in the deduplication store
167 168 169 |
# File 'lib/beetle/message.rb', line 167 def set_timeout! @store.set(msg_id, :timeout, (now + timeout).ceil) end |
#setup(opts) ⇒ Object
:nodoc:
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/beetle/message.rb', line 73 def setup(opts) #:nodoc: @server = opts[:server] @timeout = opts[:timeout] || DEFAULT_HANDLER_TIMEOUT.to_i @delay = (opts[:delay] || DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY).ceil @attempts_limit = opts[:attempts] || DEFAULT_HANDLER_EXECUTION_ATTEMPTS @exceptions_limit = opts[:exceptions] || DEFAULT_EXCEPTION_LIMIT @attempts_limit = @exceptions_limit + 1 if @attempts_limit <= @exceptions_limit @retry_on = opts[:retry_on] || nil @store = opts[:store] max_delay = opts[:max_delay] || @delay @max_delay = max_delay.ceil if max_delay >= 2*@delay end |
#simple? ⇒ Boolean
whether this is a message we can process without accessing the deduplication store
162 163 164 |
# File 'lib/beetle/message.rb', line 162 def simple? !redundant? && attempts_limit == 1 end |
#timed_out! ⇒ Object
reset handler timeout in the deduplication store
177 178 179 |
# File 'lib/beetle/message.rb', line 177 def timed_out! @store.set(msg_id, :timeout, 0) end |
#timed_out?(t = nil) ⇒ Boolean
handler timed out?
172 173 174 |
# File 'lib/beetle/message.rb', line 172 def timed_out?(t = nil) (t ||= @store.get(msg_id, :timeout)) && (t.to_i + TIMEOUT_GRACE_PERIOD) < now end |