Class: ASS::Topic::Funnel

Inherits:
Object
  • Object
show all
Defined in:
lib/ass/topic.rb

Instance Method Summary collapse

Constructor Details

#initialize(tunnel_name, funnel_name, key_matcher) ⇒ Funnel

Returns a new instance of Funnel.



23
24
25
26
27
# File 'lib/ass/topic.rb', line 23

def initialize(tunnel_name,funnel_name,key_matcher)
  @funnel_name = funnel_name
  @exchange = ASS.dummy_exchange(tunnel_name)
  @matcher = key_matcher
end

Instance Method Details

#build_callback(callback) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/ass/topic.rb', line 65

def build_callback(callback)
  c = case callback
      when Proc
        Class.new &callback
      when Class
        callback
      when Module
        Class.new { include callback }
      else
        raise "can build topic callback from one of Proc, Class, Module"
      end
  raise "must react to on_event" unless c.public_method_defined?(:on_event)
  c
end

#queue(opts = {}) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/ass/topic.rb', line 29

def queue(opts={})
  unless @queue
    @queue = MQ.queue(@funnel_name,opts)
    @queue.bind(@exchange.name,
                opts.merge({ :key => @matcher }))
  end
  @queue
end

#react(callback = nil, opts = {}, &block) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/ass/topic.rb', line 38

def react(callback=nil,opts={},&block)
  callback = build_callback(callback || block)
  me = self
  self.queue.subscribe(opts) do |info,payload|
    data = ASS.serializer.load(payload)
    handler = callback.new
    work = lambda {
      begin
        handler.send(:on_event,info.routing_key,data)
      rescue => e
        me.unhandled_error(e)
      end
    }
    done = lambda { |_|
      # nothing left to do
    }
    EM.defer work, done
  end
end

#unhandled_error(e) ⇒ Object



58
59
60
61
62
63
# File 'lib/ass/topic.rb', line 58

def unhandled_error(e)
  $stderr.puts e
  $stderr.puts e.backtrace
  ASS.stop
  raise e
end