Module: Marconi::Receiver::ClassMethods

Defined in:
lib/marconi/receiver.rb

Instance Method Summary collapse

Instance Method Details

#broadcasts_suppressed?Boolean

Returns:

  • (Boolean)


65
66
67
# File 'lib/marconi/receiver.rb', line 65

def broadcasts_suppressed?
  @suppress_broadcasts
end

#exchangeObject



73
74
75
# File 'lib/marconi/receiver.rb', line 73

def exchange 
  Marconi.outbound
end

#handlersObject



27
28
29
# File 'lib/marconi/receiver.rb', line 27

def handlers
  @handlers ||= HashWithIndifferentAccess.new
end

#listen(max = nil) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/marconi/receiver.rb', line 40

def listen(max = nil)
  q_name = "#{Marconi.application_name}.#{self.master_model_name}"
  topic = "#.#{self.master_model_name}.#"
  exchange.subscribe(q_name, :key => topic, :message_max => max) do |amqp_msg|
    e = Envelope.from_xml(amqp_msg[:payload])
    suppress_broadcasts do
      e.messages.each do |message|
        log_message_action(message, "Receiving")
        operation = message[:meta][:operation]
        next unless self.handlers[operation]
        self.handlers[operation].each do |h|
          h.call(message)
        end
      end
    end
  end
end

#log_message_action(amqp_mesg, info_mesg) ⇒ Object



31
32
33
34
35
36
37
38
# File 'lib/marconi/receiver.rb', line 31

def log_message_action(amqp_mesg, info_mesg)
  fmt = "%28s %9s %s %s" %
    [amqp_mesg[:meta][:guid],
     amqp_mesg[:meta][:operation],
     Time.now().to_s, info_mesg]
  logger.debug(fmt)
  Marconi.log(fmt)
end

#master_model_nameObject



18
19
20
# File 'lib/marconi/receiver.rb', line 18

def master_model_name
  @master_model_name
end

#purge_handlersObject



69
70
71
# File 'lib/marconi/receiver.rb', line 69

def purge_handlers
  @handlers = nil
end

#register(operation, &block) ⇒ Object



22
23
24
25
# File 'lib/marconi/receiver.rb', line 22

def register(operation, &block)
  self.handlers[operation] ||= []
  self.handlers[operation] << block
end

#setupObject



14
15
16
# File 'lib/marconi/receiver.rb', line 14

def setup
  @master_model_name = self.name.underscore
end

#suppress_broadcastsObject



58
59
60
61
62
63
# File 'lib/marconi/receiver.rb', line 58

def suppress_broadcasts
  @suppress_broadcasts = true
  yield
ensure
  @suppress_broadcasts = false
end