Class: Emissary::Message
Instance Attribute Summary collapse
-
#account ⇒ Object
Returns the value of attribute account.
-
#agent ⇒ Object
Returns the value of attribute agent.
-
#args ⇒ Object
Returns the value of attribute args.
-
#errors(type = :default) ⇒ Object
Returns the value of attribute errors.
-
#method ⇒ Object
Returns the value of attribute method.
-
#operation ⇒ Object
Returns the value of attribute operation.
-
#originator ⇒ Object
readonly
Returns the value of attribute originator.
-
#recipient ⇒ Object
Returns the value of attribute recipient.
-
#replyto ⇒ Object
Returns the value of attribute replyto.
-
#sender ⇒ Object
Returns the value of attribute sender.
-
#status ⇒ Object
Returns the value of attribute status.
-
#thread ⇒ Object
Returns the value of attribute thread.
-
#time ⇒ Object
Returns the value of attribute time.
-
#uuid ⇒ Object
readonly
Returns the value of attribute uuid.
Class Method Summary collapse
Instance Method Summary collapse
- #bounce(message = nil) ⇒ Object
- #canonical_route(who = :recipient) ⇒ Object
- #data ⇒ Object
- #encode ⇒ Object
- #error(message = nil) ⇒ Object
- #exchange(who = :recipient) ⇒ Object
- #exchange_name(who = :recipient) ⇒ Object
- #exchange_type(who = :recipient) ⇒ Object
- #headers ⇒ Object
-
#initialize(payload = {}) ⇒ Message
constructor
A new instance of Message.
- #response ⇒ Object
- #route(who = :recipient) ⇒ Object
- #routing_key(who = :recipient) ⇒ Object
- #stamp_received! ⇒ Object
- #stamp_sent! ⇒ Object
- #status_note ⇒ Object
- #status_note=(n) ⇒ Object
- #status_type ⇒ Object
- #status_type=(t) ⇒ Object
- #trip_time ⇒ Object
- #will_loop? ⇒ Boolean
Constructor Details
#initialize(payload = {}) ⇒ Message
Returns a new instance of Message.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/emissary/message.rb', line 27 def initialize(payload = {}) @recipient = '' @sender = Emissary.identity.queue_name @replyto = Emissary.identity.queue_name @originator = @sender.dup @status = [ :ok, '' ] # tuple of (status type, status message) @operation = -1 @thread = -1 @uuid = UUID.new.generate @errors = [] @agent = @method = nil @account = Emissary.identity.account_id @args = [] @time = { :received => nil, :sent => nil, } payload = { :headers => { :recipient => @recipient, :sender => @sender, :replyto => @replyto, :originator => @originator, :status => @status, :operation => @operation, :thread => @thread, :uuid => @uuid, :time => @time.merge((payload[:time].symbolize rescue {})), }.merge((payload[:headers].symbolize rescue {})), :data => { :account => @account, :agent => @agent, :method => @method, :args => @args }.merge((payload[:data].symbolize! rescue {})), :errors => [ ] + (payload[:errors].symbolize rescue []) } payload[:headers].merge(payload[:data]).each do |k,v| send("#{k}=".to_sym, v) rescue nil end payload[:errors].each do |e| exception = ::Emissary.klass_const(e[:type]).new(e[:message]) rescue StandardError.new("#{e[:type]}: #{e[:message]}") exception.set_backtrace(e[:backtrace]) errors << exception end @agent = @agent.to_sym rescue nil @method = @method.to_sym rescue nil @args = @args || [] rescue [] end |
Instance Attribute Details
#account ⇒ Object
Returns the value of attribute account.
24 25 26 |
# File 'lib/emissary/message.rb', line 24 def account @account end |
#agent ⇒ Object
Returns the value of attribute agent.
24 25 26 |
# File 'lib/emissary/message.rb', line 24 def agent @agent end |
#args ⇒ Object
Returns the value of attribute args.
24 25 26 |
# File 'lib/emissary/message.rb', line 24 def args @args end |
#errors(type = :default) ⇒ Object
Returns the value of attribute errors.
22 23 24 |
# File 'lib/emissary/message.rb', line 22 def errors @errors end |
#method ⇒ Object
Returns the value of attribute method.
24 25 26 |
# File 'lib/emissary/message.rb', line 24 def method @method end |
#operation ⇒ Object
Returns the value of attribute operation.
23 24 25 |
# File 'lib/emissary/message.rb', line 23 def operation @operation end |
#originator ⇒ Object (readonly)
Returns the value of attribute originator.
25 26 27 |
# File 'lib/emissary/message.rb', line 25 def originator @originator end |
#recipient ⇒ Object
Returns the value of attribute recipient.
22 23 24 |
# File 'lib/emissary/message.rb', line 22 def recipient @recipient end |
#replyto ⇒ Object
Returns the value of attribute replyto.
22 23 24 |
# File 'lib/emissary/message.rb', line 22 def replyto @replyto end |
#sender ⇒ Object
Returns the value of attribute sender.
22 23 24 |
# File 'lib/emissary/message.rb', line 22 def sender @sender end |
#status ⇒ Object
Returns the value of attribute status.
23 24 25 |
# File 'lib/emissary/message.rb', line 23 def status @status end |
#thread ⇒ Object
Returns the value of attribute thread.
23 24 25 |
# File 'lib/emissary/message.rb', line 23 def thread @thread end |
#time ⇒ Object
Returns the value of attribute time.
23 24 25 |
# File 'lib/emissary/message.rb', line 23 def time @time end |
#uuid ⇒ Object (readonly)
Returns the value of attribute uuid.
25 26 27 |
# File 'lib/emissary/message.rb', line 25 def uuid @uuid end |
Class Method Details
.decode(payload) ⇒ Object
160 161 162 163 164 165 166 167 168 |
# File 'lib/emissary/message.rb', line 160 def self.decode payload begin self.new BERT.decode(payload) rescue StandardError => e raise e unless e. =~ /bad magic/i Emissary.logger.error "Unable to decode message - maybe it wasn't encoded with BERT..?" raise ::Emissary::Error::InvalidMessageFormat, "Unable to decode message - maybe it wasn't encoded with BERT? Message: #{payload.inspect}" end end |
Instance Method Details
#bounce(message = nil) ⇒ Object
198 199 200 201 202 203 |
# File 'lib/emissary/message.rb', line 198 def bounce( = nil) ||= 'Message failed due to missing handler.' bounced = self.response bounced.status = [ :bounced, ] return bounced end |
#canonical_route(who = :recipient) ⇒ Object
148 149 150 |
# File 'lib/emissary/message.rb', line 148 def canonical_route who = :recipient "#{routing_key(who)}:#{exchange(who).join(':')}" end |
#data ⇒ Object
99 100 101 |
# File 'lib/emissary/message.rb', line 99 def data() return { :account => account, :agent => agent, :method => method, :args => args } end |
#encode ⇒ Object
156 157 158 |
# File 'lib/emissary/message.rb', line 156 def encode BERT.encode({ :headers => headers, :data => data, :errors => errors(:hashes) }) end |
#error(message = nil) ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/emissary/message.rb', line 205 def error( = nil) ||= 'Message failed due to unspecified error.' error = self.response error.status = [ :errored, .to_s ] if .is_a? Exception error.errors << else ::Emissary.logger.warning "#{.class.name} is not an exception..." end return error end |
#exchange(who = :recipient) ⇒ Object
128 129 130 |
# File 'lib/emissary/message.rb', line 128 def exchange who = :recipient [ exchange_type(who), exchange_name(who) ] end |
#exchange_name(who = :recipient) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/emissary/message.rb', line 136 def exchange_name who = :recipient key, type, name = route(who) rescue [ nil, :direct, 'amq.direct' ] name || case type.to_sym when :fanout, :topic, :matches, :headers "amq.#{type}" else 'amq.direct' end rescue 'amq.direct' end |
#exchange_type(who = :recipient) ⇒ Object
132 133 134 |
# File 'lib/emissary/message.rb', line 132 def exchange_type who = :recipient route(who)[1].to_sym || :direct rescue :direct end |
#headers ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/emissary/message.rb', line 85 def headers() { :recipient => recipient, :sender => sender, :replyto => replyto, :originator => originator, :status => status, :operation => operation, :thread => thread, :time => time, :uuid => uuid } end |
#response ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/emissary/message.rb', line 184 def response header = { :recipient => replyto || sender, :status => [ :ok, '' ], :operation => operation, :thread => thread, :time => { :received => nil, :sent => nil } } return Message.new({ :headers => header, :data => data, :errors => errors(:hashes) }) end |
#route(who = :recipient) ⇒ Object
120 121 122 |
# File 'lib/emissary/message.rb', line 120 def route who = :recipient headers[who].split(':') || [] rescue [] end |
#routing_key(who = :recipient) ⇒ Object
124 125 126 |
# File 'lib/emissary/message.rb', line 124 def routing_key who = :recipient route(who)[0] || nil rescue nil end |
#stamp_received! ⇒ Object
175 176 177 178 |
# File 'lib/emissary/message.rb', line 175 def stamp_received! time[:received] = Time.now.to_f self end |
#stamp_sent! ⇒ Object
170 171 172 173 |
# File 'lib/emissary/message.rb', line 170 def stamp_sent! time[:sent] = Time.now.to_f self end |
#status_note ⇒ Object
118 |
# File 'lib/emissary/message.rb', line 118 def status_note() status[1] || '' rescue '' ; end |
#status_note=(n) ⇒ Object
117 |
# File 'lib/emissary/message.rb', line 117 def status_note=(n) status[1] = n; end |
#status_type ⇒ Object
115 |
# File 'lib/emissary/message.rb', line 115 def status_type() status[0] || :ok rescue :ok; end |
#status_type=(t) ⇒ Object
114 |
# File 'lib/emissary/message.rb', line 114 def status_type=(t) status[0] = t.to_sym; end |
#trip_time ⇒ Object
180 181 182 |
# File 'lib/emissary/message.rb', line 180 def trip_time (time[:sent].to_f - time[:received].to_f rescue 0.0) || 0.0 end |
#will_loop? ⇒ Boolean
152 153 154 |
# File 'lib/emissary/message.rb', line 152 def will_loop? canonical_route(:recipient) == canonical_route(:originator) end |