Class: RSched::Engine
- Inherits:
-
Object
- Object
- RSched::Engine
- Defined in:
- lib/rsched/engine.rb
Defined Under Namespace
Classes: Sched, TimerThread
Instance Method Summary collapse
- #init_proc(run_proc, kill_proc) ⇒ Object
-
#initialize(lock, conf) ⇒ Engine
constructor
A new instance of Engine.
- #run ⇒ Object
-
#set_sched(ident, action, cron) ⇒ Object
=> (ident,action).
- #shutdown ⇒ Object
Constructor Details
#initialize(lock, conf) ⇒ Engine
Returns a new instance of Engine.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/rsched/engine.rb', line 48 def initialize(lock, conf) @lock = lock @resume = conf[:resume] @delay = conf[:delay] @interval = conf[:interval] @delete = conf[:delete] @extend_timeout = conf[:extend_timeout] @kill_timeout = conf[:kill_timeout] @kill_retry = conf[:kill_retry] @sched_start = conf[:from] || 0 @release_on_fail = conf[:release_on_fail] @env = conf[:env] || {} @finished = false @ss = {} @extender = TimerThread.new(@lock, @extend_timeout, @kill_timeout, @kill_retry) @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#init_proc(run_proc, kill_proc) ⇒ Object
75 76 77 78 |
# File 'lib/rsched/engine.rb', line 75 def init_proc(run_proc, kill_proc) @run_proc = run_proc @extender.init_proc(kill_proc) end |
#run ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/rsched/engine.rb', line 80 def run @extender.start until @finished one = false sched_time = Time.now.to_i - @delay @ss.each_pair {|ident,s| s.sched(sched_time) s.queue.delete_if {|time| next if @finished token = @lock.acquire(ident, time) case token when nil # already finished true when false # not finished but already locked false else one = true process(token, ident, time, s.action) end } break if @finished } return if @finished unless one cond_wait(@interval) end end end |
#set_sched(ident, action, cron) ⇒ Object
=> (ident,action)
70 71 72 73 |
# File 'lib/rsched/engine.rb', line 70 def set_sched(ident, action, cron) now = Time.now.to_i @ss[ident] = Sched.new(cron, action, @sched_start, now-@delay-@resume, now-@delay) end |
#shutdown ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/rsched/engine.rb', line 120 def shutdown @finished = true @extender.shutdown @mutex.synchronize { @cond.broadcast } end |