Class: PerfectSched::Engine
- Inherits:
-
Object
- Object
- PerfectSched::Engine
- Defined in:
- lib/perfectsched/engine.rb
Instance Method Summary collapse
- #finished? ⇒ Boolean
-
#initialize(backend, queue, log, conf) ⇒ Engine
constructor
A new instance of Engine.
- #process(token, task) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(backend, queue, log, conf) ⇒ Engine
Returns a new instance of Engine.
6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/perfectsched/engine.rb', line 6 def initialize(backend, queue, log, conf) require 'time' @backend = backend @queue = queue @log = log @timeout = conf[:timeout] @poll_interval = conf[:poll_interval] || 1 @croncalc = CronCalc.new @finished = false end |
Instance Method Details
#finished? ⇒ Boolean
19 20 21 |
# File 'lib/perfectsched/engine.rb', line 19 def finished? @finished end |
#process(token, task) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/perfectsched/engine.rb', line 37 def process(token, task) @log.info "processing schedule id=#{task.id} time=#{Time.at(task.time).iso8601} at #{Time.now.iso8601}" begin id = gen_id(task) @queue.submit(id, task.data) # ignore already exists error next_time = @croncalc.next_time(task.cron, task.time) next_run = next_time + task.delay @backend.finish(token, next_time, next_run) @log.info "submitted schedule id=#{task.id}" rescue @log.info "failed schedule id=#{task.id}: #{$!}" end end |
#run ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/perfectsched/engine.rb', line 23 def run until finished? @log.debug "polling... #{@timeout} #{@poll_interval}" token, task = @backend.acquire(Time.now.to_i+@timeout) unless token sleep @poll_interval next end process(token, task) end end |
#shutdown ⇒ Object
60 61 |
# File 'lib/perfectsched/engine.rb', line 60 def shutdown end |
#stop ⇒ Object
56 57 58 |
# File 'lib/perfectsched/engine.rb', line 56 def stop @finished = true end |