Class: RCelery::Pool
- Inherits:
-
Object
- Object
- RCelery::Pool
- Defined in:
- lib/rcelery/pool.rb
Instance Method Summary collapse
- #defer(task) ⇒ Object
-
#initialize(options = {}) ⇒ Pool
constructor
A new instance of Pool.
- #poll ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #subscribe ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
Instance Method Details
#defer(task) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/rcelery/pool.rb', line 38 def defer(task) time_difference = (Time.parse(task[:message]['eta']) - Time.now).to_i EM.add_timer(time_difference) do @task_queue.push(task) end end |
#poll ⇒ Object
45 46 47 |
# File 'lib/rcelery/pool.rb', line 45 def poll @task_queue.pop end |
#start ⇒ Object
12 13 14 |
# File 'lib/rcelery/pool.rb', line 12 def start subscribe end |
#stop ⇒ Object
53 54 55 56 |
# File 'lib/rcelery/pool.rb', line 53 def stop unsubscribe RCelery.stop end |
#subscribe ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rcelery/pool.rb', line 16 def subscribe # amqp-client has a nice fat TODO in the delivery handler to # ack if necessary; we'll just manually do it, however, the # call to subscribe still needs :ack => true so the server # expects our ack RCelery.queue.subscribe(:ack => true) do |header, payload| begin = JSON.parse(payload) RCelery::Events.task_received(['id'], ['task'], ['args'], ['kwargs'], nil, ['eta']) if ['eta'] && Time.parse(['eta']) > Time.now defer({:message => , :header => header}) else @task_queue.push({:message => , :header => header}) end rescue JSON::ParserError # not a message we care about header.ack end end end |