Class: Untied::Consumer::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/untied-consumer/processor.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeProcessor

Returns a new instance of Processor.



7
8
9
10
# File 'lib/untied-consumer/processor.rb', line 7

def initialize
  @observers = \
    self.class.observers.collect { |o| o.to_s.camelize.constantize.instance }
end

Instance Attribute Details

#observersObject (readonly)

Returns the value of attribute observers.



5
6
7
# File 'lib/untied-consumer/processor.rb', line 5

def observers
  @observers
end

Class Method Details

.observersObject



40
41
42
# File 'lib/untied-consumer/processor.rb', line 40

def observers
  @observers ||= []
end

.observers=(*obs) ⇒ Object



36
37
38
# File 'lib/untied-consumer/processor.rb', line 36

def observers=(*obs)
  @observers = obs.flatten
end

Instance Method Details

#process(headers, message) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/untied-consumer/processor.rb', line 12

def process(headers, message)
  begin
    message = JSON.parse(message)
  rescue JSON::ParserError => e
    Consumer.config.logger "Untied::Processor: Parsing error #{e}"
    return
  end

  message = message.fetch("event", {})
  payload = message.fetch("payload", {})
  service = message["origin"].try(:to_sym)
  event_name = message["name"].try(:to_sym)
  klass = payload.keys.first.try(:to_sym)

  Consumer.config.logger.info \
    "Untied::Processor: processing event #{event_name} from #{service} with " + \
    "payload #{payload}"

  observers.each do |observer|
    observer.notify(event_name, klass, service, payload)
  end
end