Class: Scheduler::Manager
- Inherits:
-
Object
- Object
- Scheduler::Manager
- Defined in:
- lib/scheduler/manager.rb
Defined Under Namespace
Classes: Runner
Instance Attribute Summary collapse
-
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
-
#redis ⇒ Object
Returns the value of attribute redis.
Class Method Summary collapse
- .current ⇒ Object
- .current=(manager) ⇒ Object
- .discover_schedules ⇒ Object
- .lock_key ⇒ Object
- .queue_key(hostname = nil) ⇒ Object
- .schedule_key(klass, hostname = nil) ⇒ Object
- .seq ⇒ Object
- .without_runner(redis = nil) ⇒ Object
Instance Method Summary collapse
- #blocking_tick ⇒ Object
- #ensure_schedule!(klass) ⇒ Object
- #get_klass(name) ⇒ Object
- #hostname ⇒ Object
- #identity_key ⇒ Object
-
#initialize(redis = nil, options = nil) ⇒ Manager
constructor
A new instance of Manager.
- #keep_alive ⇒ Object
- #keep_alive_duration ⇒ Object
- #lock ⇒ Object
- #next_run(klass) ⇒ Object
- #remove(klass) ⇒ Object
- #reschedule_orphans! ⇒ Object
- #reschedule_orphans_on!(hostname = nil) ⇒ Object
- #schedule_info(klass) ⇒ Object
- #schedule_next_job(hostname = nil) ⇒ Object
- #stop! ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(redis = nil, options = nil) ⇒ Manager
Returns a new instance of Manager.
120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/scheduler/manager.rb', line 120 def initialize(redis = nil, =nil) @redis = $redis || redis @random_ratio = 0.1 unless && [:skip_runner] @runner = Runner.new(self) self.class.current = self end @hostname = && [:hostname] @manager_id = SecureRandom.hex end |
Instance Attribute Details
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
11 12 13 |
# File 'lib/scheduler/manager.rb', line 11 def random_ratio @random_ratio end |
#redis ⇒ Object
Returns the value of attribute redis.
11 12 13 |
# File 'lib/scheduler/manager.rb', line 11 def redis @redis end |
Class Method Details
.current ⇒ Object
132 133 134 |
# File 'lib/scheduler/manager.rb', line 132 def self.current @current end |
.current=(manager) ⇒ Object
136 137 138 |
# File 'lib/scheduler/manager.rb', line 136 def self.current=(manager) @current = manager end |
.discover_schedules ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/scheduler/manager.rb', line 255 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(Scheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end |
.lock_key ⇒ Object
283 284 285 |
# File 'lib/scheduler/manager.rb', line 283 def self.lock_key "_scheduler_lock_" end |
.queue_key(hostname = nil) ⇒ Object
287 288 289 290 291 292 293 |
# File 'lib/scheduler/manager.rb', line 287 def self.queue_key(hostname=nil) if hostname "_scheduler_queue_#{hostname}_" else "_scheduler_queue_" end end |
.schedule_key(klass, hostname = nil) ⇒ Object
295 296 297 298 299 300 301 |
# File 'lib/scheduler/manager.rb', line 295 def self.schedule_key(klass,hostname=nil) if hostname "_scheduler_#{klass}_#{hostname}" else "_scheduler_#{klass}" end end |
.seq ⇒ Object
272 273 274 275 276 277 |
# File 'lib/scheduler/manager.rb', line 272 def self.seq @mutex.synchronize do @i ||= 0 @i += 1 end end |
.without_runner(redis = nil) ⇒ Object
116 117 118 |
# File 'lib/scheduler/manager.rb', line 116 def self.without_runner(redis=nil) self.new(redis, skip_runner: true) end |
Instance Method Details
#blocking_tick ⇒ Object
230 231 232 233 |
# File 'lib/scheduler/manager.rb', line 230 def blocking_tick tick @runner.wait_till_done end |
#ensure_schedule!(klass) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/scheduler/manager.rb', line 152 def ensure_schedule!(klass) lock do schedule_info(klass).schedule! end end |
#get_klass(name) ⇒ Object
187 188 189 190 191 |
# File 'lib/scheduler/manager.rb', line 187 def get_klass(name) name.constantize rescue NameError nil end |
#hostname ⇒ Object
140 141 142 |
# File 'lib/scheduler/manager.rb', line 140 def hostname @hostname ||= `hostname`.strip end |
#identity_key ⇒ Object
279 280 281 |
# File 'lib/scheduler/manager.rb', line 279 def identity_key @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end |
#keep_alive ⇒ Object
244 245 246 |
# File 'lib/scheduler/manager.rb', line 244 def keep_alive redis.setex identity_key, keep_alive_duration, "" end |
#keep_alive_duration ⇒ Object
240 241 242 |
# File 'lib/scheduler/manager.rb', line 240 def keep_alive_duration 60 end |
#lock ⇒ Object
248 249 250 251 252 |
# File 'lib/scheduler/manager.rb', line 248 def lock DistributedMutex.new(Manager.lock_key).synchronize do yield end end |
#next_run(klass) ⇒ Object
148 149 150 |
# File 'lib/scheduler/manager.rb', line 148 def next_run(klass) schedule_info(klass).next_run end |
#remove(klass) ⇒ Object
159 160 161 162 163 |
# File 'lib/scheduler/manager.rb', line 159 def remove(klass) lock do schedule_info(klass).del! end end |
#reschedule_orphans! ⇒ Object
165 166 167 168 169 170 |
# File 'lib/scheduler/manager.rb', line 165 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end |
#reschedule_orphans_on!(hostname = nil) ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/scheduler/manager.rb', line 172 def reschedule_orphans_on!(hostname=nil) redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && (info.current_owner.blank? || !redis.get(info.current_owner)) info.prev_result = 'ORPHAN' info.next_run = Time.now.to_i info.write! end end end |
#schedule_info(klass) ⇒ Object
144 145 146 |
# File 'lib/scheduler/manager.rb', line 144 def schedule_info(klass) ScheduleInfo.new(klass, self) end |
#schedule_next_job(hostname = nil) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/scheduler/manager.rb', line 200 def schedule_next_job(hostname=nil) (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) unless klass # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(hostname), key return end unless klass.respond_to?(:daily) && klass.respond_to?(:every) # job klass exists but no longer extends from the base redis.zrem Manager.queue_key(hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end |
#stop! ⇒ Object
235 236 237 238 |
# File 'lib/scheduler/manager.rb', line 235 def stop! @runner.stop! self.class.current = nil end |
#tick ⇒ Object
193 194 195 196 197 198 |
# File 'lib/scheduler/manager.rb', line 193 def tick lock do schedule_next_job schedule_next_job(hostname) end end |