Class: RSched::Engine
- Inherits:
-
Object
- Object
- RSched::Engine
- Defined in:
- lib/rsched/engine.rb
Defined Under Namespace
Classes: Sched, TimerThread
Instance Method Summary collapse
- #init_proc(run_proc, kill_proc) ⇒ Object
-
#initialize(lock, conf) ⇒ Engine
constructor
A new instance of Engine.
- #run ⇒ Object
-
#set_sched(ident, action, cron) ⇒ Object
=> (ident,action).
- #shutdown ⇒ Object
Constructor Details
#initialize(lock, conf) ⇒ Engine
Returns a new instance of Engine.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/rsched/engine.rb', line 48 def initialize(lock, conf) @lock = lock @resume = conf[:resume] @delay = conf[:delay] @interval = conf[:interval] @delete = conf[:delete] @extend_timeout = conf[:extend_timeout] @kill_timeout = conf[:kill_timeout] @kill_retry = conf[:kill_retry] @sched_start = conf[:from] || 0 @release_on_fail = conf[:release_on_fail] @finished = false @ss = {} @extender = TimerThread.new(@lock, @extend_timeout, @kill_timeout, @kill_retry) @mutex = Mutex.new @cond = ConditionVariable.new end |
Instance Method Details
#init_proc(run_proc, kill_proc) ⇒ Object
74 75 76 77 |
# File 'lib/rsched/engine.rb', line 74 def init_proc(run_proc, kill_proc) @run_proc = run_proc @extender.init_proc(kill_proc) end |
#run ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/rsched/engine.rb', line 79 def run @extender.start until @finished one = false sched_time = Time.now.to_i - @delay @ss.each_pair {|ident,s| s.sched(sched_time) s.queue.delete_if {|time| next if @finished token = @lock.acquire(ident, time) case token when nil # already finished true when false # not finished but already locked false else one = true process(token, ident, time, s.action) end } break if @finished } return if @finished unless one cond_wait(@interval) end end end |
#set_sched(ident, action, cron) ⇒ Object
=> (ident,action)
69 70 71 72 |
# File 'lib/rsched/engine.rb', line 69 def set_sched(ident, action, cron) now = Time.now.to_i @ss[ident] = Sched.new(cron, action, @sched_start, now-@delay-@resume, now-@delay) end |
#shutdown ⇒ Object
119 120 121 122 123 124 125 |
# File 'lib/rsched/engine.rb', line 119 def shutdown @finished = true @extender.shutdown @mutex.synchronize { @cond.broadcast } end |