Class: ASS::Topic::Funnel
- Inherits:
-
Object
- Object
- ASS::Topic::Funnel
- Defined in:
- lib/ass/topic.rb
Instance Method Summary collapse
- #build_callback(callback) ⇒ Object
-
#initialize(tunnel_name, funnel_name, key_matcher) ⇒ Funnel
constructor
A new instance of Funnel.
- #queue(opts = {}) ⇒ Object
- #react(callback = nil, opts = {}, &block) ⇒ Object
- #unhandled_error(e) ⇒ Object
Constructor Details
#initialize(tunnel_name, funnel_name, key_matcher) ⇒ Funnel
Returns a new instance of Funnel.
23 24 25 26 27 |
# File 'lib/ass/topic.rb', line 23 def initialize(tunnel_name,funnel_name,key_matcher) @funnel_name = funnel_name @exchange = ASS.dummy_exchange(tunnel_name) @matcher = key_matcher end |
Instance Method Details
#build_callback(callback) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/ass/topic.rb', line 65 def build_callback(callback) c = case callback when Proc Class.new &callback when Class callback when Module Class.new { include callback } else raise "can build topic callback from one of Proc, Class, Module" end raise "must react to on_event" unless c.public_method_defined?(:on_event) c end |
#queue(opts = {}) ⇒ Object
29 30 31 32 33 34 35 36 |
# File 'lib/ass/topic.rb', line 29 def queue(opts={}) unless @queue @queue = MQ.queue(@funnel_name,opts) @queue.bind(@exchange.name, opts.merge({ :key => @matcher })) end @queue end |
#react(callback = nil, opts = {}, &block) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/ass/topic.rb', line 38 def react(callback=nil,opts={},&block) callback = build_callback(callback || block) me = self self.queue.subscribe(opts) do |info,payload| data = ASS.serializer.load(payload) handler = callback.new work = lambda { begin handler.send(:on_event,info.routing_key,data) rescue => e me.unhandled_error(e) end } done = lambda { |_| # nothing left to do } EM.defer work, done end end |