Class: Celerb::ResultConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/celerb/result_consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(channel, opts = {}) ⇒ ResultConsumer

Returns a new instance of ResultConsumer.



4
5
6
7
8
9
10
11
12
# File 'lib/celerb/result_consumer.rb', line 4

def initialize channel, opts={}
  @channel = channel
  @options = opts
  @handlers = {}
  EM.add_periodic_timer(60) do
    now = Time.now
    @handlers.delete_if {|id,h| h[:expires] < now}
  end
end

Instance Method Details

#consume(header, body) ⇒ Object



22
23
24
25
26
27
28
29
# File 'lib/celerb/result_consumer.rb', line 22

def consume(header, body)
  result = Result.new(MessagePack.unpack(body))
  if @handlers.include? result.task_id
    handler = @handlers.delete result.task_id
    handler[:proc].call result
    handler[:queue].unsubscribe
  end
end

#register(task_id, expiration, &blk) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/celerb/result_consumer.rb', line 14

def register(task_id, expiration, &blk)
  @handlers[task_id] = {
    :queue   => subscribe(task_id),
    :expires => Time.now + expiration,
    :proc    => blk
  }
end