Class: Celerb::ResultConsumer
- Inherits:
-
Object
- Object
- Celerb::ResultConsumer
- Defined in:
- lib/celerb/result_consumer.rb
Instance Method Summary collapse
- #consume(header, body) ⇒ Object
-
#initialize(channel, opts = {}) ⇒ ResultConsumer
constructor
A new instance of ResultConsumer.
- #register(task_id, expiration, &blk) ⇒ Object
Constructor Details
#initialize(channel, opts = {}) ⇒ ResultConsumer
Returns a new instance of ResultConsumer.
4 5 6 7 8 9 10 11 12 |
# File 'lib/celerb/result_consumer.rb', line 4 def initialize channel, opts={} @channel = channel @options = opts @handlers = {} EM.add_periodic_timer(60) do now = Time.now @handlers.delete_if {|id,h| h[:expires] < now} end end |
Instance Method Details
#consume(header, body) ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/celerb/result_consumer.rb', line 22 def consume(header, body) result = Result.new(MessagePack.unpack(body)) if @handlers.include? result.task_id handler = @handlers.delete result.task_id handler[:proc].call result handler[:queue].unsubscribe end end |
#register(task_id, expiration, &blk) ⇒ Object
14 15 16 17 18 19 20 |
# File 'lib/celerb/result_consumer.rb', line 14 def register(task_id, expiration, &blk) @handlers[task_id] = { :queue => subscribe(task_id), :expires => Time.now + expiration, :proc => blk } end |