Class: Promiscuous::Subscriber::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/subscriber/message.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload, options = {}) ⇒ Message

Returns a new instance of Message.



4
5
6
7
8
# File 'lib/promiscuous/subscriber/message.rb', line 4

def initialize(payload, options={})
  self.payload = payload
  @metadata = options[:metadata]
  @root_worker = options[:root_worker]
end

Instance Attribute Details

#parsed_payloadObject

Returns the value of attribute parsed_payload.



2
3
4
# File 'lib/promiscuous/subscriber/message.rb', line 2

def parsed_payload
  @parsed_payload
end

#payloadObject

Returns the value of attribute payload.



2
3
4
# File 'lib/promiscuous/subscriber/message.rb', line 2

def payload
  @payload
end

Instance Method Details

#ackObject



62
63
64
65
66
67
68
69
70
71
# File 'lib/promiscuous/subscriber/message.rb', line 62

def ack
  time = Time.now
  Promiscuous.debug "[receive] #{payload}"
  @metadata.try(:ack)
  @root_worker.stats.notify_processed_message(self, time) if @root_worker
rescue Exception => e
  # We don't care if we fail, the message will be redelivered at some point
  Promiscuous.warn "[receive] Some exception happened, but it's okay: #{e}\n#{e.backtrace.join("\n")}"
  Promiscuous::Config.error_notifier.call(e)
end

#appObject



14
15
16
# File 'lib/promiscuous/subscriber/message.rb', line 14

def app
  parsed_payload['app']
end

#dependenciesObject



26
27
28
29
30
31
32
33
# File 'lib/promiscuous/subscriber/message.rb', line 26

def dependencies
  @dependencies ||= begin
    dependencies = parsed_payload['dependencies'] || {}
    deps = dependencies['write'].to_a.map { |dep| Promiscuous::Dependency.parse(dep, :type => :write, :owner => app) }

    deps
  end
end

#generationObject



22
23
24
# File 'lib/promiscuous/subscriber/message.rb', line 22

def generation
  parsed_payload['generation'] || 0
end

#happens_before_dependenciesObject



43
44
45
46
47
48
49
50
51
52
# File 'lib/promiscuous/subscriber/message.rb', line 43

def happens_before_dependencies
  @happens_before_dependencies ||= begin
    deps = []
    deps += read_dependencies
    deps += write_dependencies.map { |dep| dep.dup.tap { |d| d.version -= 1 } }

    # We return the most difficult condition to satisfy first
    deps.uniq.reverse
  end
end

#has_dependencies?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/promiscuous/subscriber/message.rb', line 54

def has_dependencies?
  dependencies.present?
end

#processObject



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/promiscuous/subscriber/message.rb', line 73

def process
  Promiscuous::Subscriber::UnitOfWork.process(self)
rescue Exception => orig_e
  e = Promiscuous::Error::Subscriber.new(orig_e, :payload => payload)
  Promiscuous.warn "[receive] #{payload} #{e}\n#{e.backtrace.join("\n")}"
  Promiscuous::Config.error_notifier.call(e)
ensure
  if defined?(ActiveRecord)
    ActiveRecord::Base.clear_active_connections!
  end
end

#timestampObject



18
19
20
# File 'lib/promiscuous/subscriber/message.rb', line 18

def timestamp
  parsed_payload['timestamp'].to_i
end

#to_sObject



58
59
60
# File 'lib/promiscuous/subscriber/message.rb', line 58

def to_s
  "#{app} -> #{write_dependencies.join(', ')}"
end

#typesObject



35
36
37
# File 'lib/promiscuous/subscriber/message.rb', line 35

def types
  @parsed_payload['types']
end

#write_dependenciesObject



39
40
41
# File 'lib/promiscuous/subscriber/message.rb', line 39

def write_dependencies
  @write_dependencies ||= dependencies.select(&:write?)
end