Class: Promiscuous::Subscriber::Message
- Inherits:
-
Object
- Object
- Promiscuous::Subscriber::Message
- Defined in:
- lib/promiscuous/subscriber/message.rb
Instance Attribute Summary collapse
-
#parsed_payload ⇒ Object
Returns the value of attribute parsed_payload.
-
#payload ⇒ Object
Returns the value of attribute payload.
Instance Method Summary collapse
- #ack ⇒ Object
- #app ⇒ Object
- #dependencies ⇒ Object
- #generation ⇒ Object
- #happens_before_dependencies ⇒ Object
- #has_dependencies? ⇒ Boolean
-
#initialize(payload, options = {}) ⇒ Message
constructor
A new instance of Message.
- #process ⇒ Object
- #timestamp ⇒ Object
- #to_s ⇒ Object
- #types ⇒ Object
- #write_dependencies ⇒ Object
Constructor Details
#initialize(payload, options = {}) ⇒ Message
Returns a new instance of Message.
4 5 6 7 8 |
# File 'lib/promiscuous/subscriber/message.rb', line 4 def initialize(payload, ={}) self.payload = payload @metadata = [:metadata] @root_worker = [:root_worker] end |
Instance Attribute Details
#parsed_payload ⇒ Object
Returns the value of attribute parsed_payload.
2 3 4 |
# File 'lib/promiscuous/subscriber/message.rb', line 2 def parsed_payload @parsed_payload end |
#payload ⇒ Object
Returns the value of attribute payload.
2 3 4 |
# File 'lib/promiscuous/subscriber/message.rb', line 2 def payload @payload end |
Instance Method Details
#ack ⇒ Object
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/promiscuous/subscriber/message.rb', line 62 def ack time = Time.now Promiscuous.debug "[receive] #{payload}" @metadata.try(:ack) @root_worker.stats.(self, time) if @root_worker rescue Exception => e # We don't care if we fail, the message will be redelivered at some point Promiscuous.warn "[receive] Some exception happened, but it's okay: #{e}\n#{e.backtrace.join("\n")}" Promiscuous::Config.error_notifier.call(e) end |
#app ⇒ Object
14 15 16 |
# File 'lib/promiscuous/subscriber/message.rb', line 14 def app parsed_payload['app'] end |
#dependencies ⇒ Object
26 27 28 29 30 31 32 33 |
# File 'lib/promiscuous/subscriber/message.rb', line 26 def dependencies @dependencies ||= begin dependencies = parsed_payload['dependencies'] || {} deps = dependencies['write'].to_a.map { |dep| Promiscuous::Dependency.parse(dep, :type => :write, :owner => app) } deps end end |
#generation ⇒ Object
22 23 24 |
# File 'lib/promiscuous/subscriber/message.rb', line 22 def generation parsed_payload['generation'] || 0 end |
#happens_before_dependencies ⇒ Object
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/promiscuous/subscriber/message.rb', line 43 def happens_before_dependencies @happens_before_dependencies ||= begin deps = [] deps += read_dependencies deps += write_dependencies.map { |dep| dep.dup.tap { |d| d.version -= 1 } } # We return the most difficult condition to satisfy first deps.uniq.reverse end end |
#has_dependencies? ⇒ Boolean
54 55 56 |
# File 'lib/promiscuous/subscriber/message.rb', line 54 def has_dependencies? dependencies.present? end |
#process ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/promiscuous/subscriber/message.rb', line 73 def process Promiscuous::Subscriber::UnitOfWork.process(self) rescue Exception => orig_e e = Promiscuous::Error::Subscriber.new(orig_e, :payload => payload) Promiscuous.warn "[receive] #{payload} #{e}\n#{e.backtrace.join("\n")}" Promiscuous::Config.error_notifier.call(e) ensure if defined?(ActiveRecord) ActiveRecord::Base.clear_active_connections! end end |
#timestamp ⇒ Object
18 19 20 |
# File 'lib/promiscuous/subscriber/message.rb', line 18 def parsed_payload['timestamp'].to_i end |
#to_s ⇒ Object
58 59 60 |
# File 'lib/promiscuous/subscriber/message.rb', line 58 def to_s "#{app} -> #{write_dependencies.join(', ')}" end |
#types ⇒ Object
35 36 37 |
# File 'lib/promiscuous/subscriber/message.rb', line 35 def types @parsed_payload['types'] end |
#write_dependencies ⇒ Object
39 40 41 |
# File 'lib/promiscuous/subscriber/message.rb', line 39 def write_dependencies @write_dependencies ||= dependencies.select(&:write?) end |