Class: RCelery::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/rcelery/pool.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Pool

Returns a new instance of Pool.



7
8
9
10
# File 'lib/rcelery/pool.rb', line 7

def initialize(options={})
  @task_queue = Queue.new
  RCelery.start(options) unless RCelery.running?
end

Instance Method Details

#defer(task) ⇒ Object



38
39
40
41
42
43
# File 'lib/rcelery/pool.rb', line 38

def defer(task)
  time_difference = (Time.parse(task[:message]['eta']) - Time.now).to_i
  EM.add_timer(time_difference) do
    @task_queue.push(task)
  end
end

#pollObject



45
46
47
# File 'lib/rcelery/pool.rb', line 45

def poll
  @task_queue.pop
end

#startObject



12
13
14
# File 'lib/rcelery/pool.rb', line 12

def start
  subscribe
end

#stopObject



53
54
55
56
# File 'lib/rcelery/pool.rb', line 53

def stop
  unsubscribe
  RCelery.stop
end

#subscribeObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rcelery/pool.rb', line 16

def subscribe
  # amqp-client has a nice fat TODO in the delivery handler to
  # ack if necessary; we'll just manually do it, however, the
  # call to subscribe still needs :ack => true so the server
  # expects our ack
  RCelery.queue.subscribe(:ack => true) do |header, payload|
    begin
      message = JSON.parse(payload)
      RCelery::Events.task_received(message['id'], message['task'], message['args'], message['kwargs'], nil, message['eta'])

      if message['eta'] && Time.parse(message['eta']) > Time.now
        defer({:message => message, :header => header})
      else
        @task_queue.push({:message => message, :header => header})
      end
    rescue JSON::ParserError
      # not a message we care about
      header.ack
    end
  end
end

#unsubscribeObject



49
50
51
# File 'lib/rcelery/pool.rb', line 49

def unsubscribe
  RCelery.queue.unsubscribe
end