Class: MiniScheduler::Manager
- Inherits:
-
Object
- Object
- MiniScheduler::Manager
- Defined in:
- lib/mini_scheduler/manager.rb
Defined Under Namespace
Classes: Runner
Constant Summary collapse
- @@identity_key_mutex =
Mutex.new
Instance Attribute Summary collapse
-
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
-
#redis ⇒ Object
Returns the value of attribute redis.
-
#workers ⇒ Object
Returns the value of attribute workers.
Class Method Summary collapse
- .current ⇒ Object
- .discover_queues ⇒ Object
-
.discover_running_scheduled_jobs ⇒ Array<Hash>
Discover running scheduled jobs on the current host.
- .discover_schedules ⇒ Object
- .hostname ⇒ Object
- .lock_key(queue) ⇒ Object
- .queue_key(queue, hostname = nil) ⇒ Object
- .schedule_key(klass, hostname = nil) ⇒ Object
- .seq ⇒ Object
- .without_runner ⇒ Object
Instance Method Summary collapse
- #blocking_tick ⇒ Object
- #ensure_schedule!(klass) ⇒ Object
- #get_klass(name) ⇒ Object
- #hostname ⇒ Object
- #identity_key ⇒ Object
-
#initialize(options = nil) ⇒ Manager
constructor
A new instance of Manager.
- #keep_alive(*ids) ⇒ Object
- #keep_alive_duration ⇒ Object
- #lock ⇒ Object
- #next_run(klass) ⇒ Object
- #remove(klass) ⇒ Object
- #repair_queue ⇒ Object
- #reschedule_orphans! ⇒ Object
- #reschedule_orphans_on!(hostname = nil) ⇒ Object
- #schedule_info(klass) ⇒ Object
- #schedule_next_job(hostname = nil) ⇒ Object
- #stop! ⇒ Object
- #tick ⇒ Object
Constructor Details
#initialize(options = nil) ⇒ Manager
Returns a new instance of Manager.
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/mini_scheduler/manager.rb', line 228 def initialize( = nil) @queue = && [:queue] || "default" @workers = && [:workers] || 1 @redis = MiniScheduler.redis @random_ratio = 0.1 unless && [:skip_runner] @runner = Runner.new(self) self.class.current[@queue] = self end @hostname = && [:hostname] @manager_id = SecureRandom.hex if && .key?(:enable_stats) @enable_stats = [:enable_stats] else @enable_stats = !!defined?(MiniScheduler::Stat) end end |
Instance Attribute Details
#enable_stats ⇒ Object
Returns the value of attribute enable_stats.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def enable_stats @enable_stats end |
#queue ⇒ Object
Returns the value of attribute queue.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def queue @queue end |
#random_ratio ⇒ Object
Returns the value of attribute random_ratio.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def random_ratio @random_ratio end |
#redis ⇒ Object
Returns the value of attribute redis.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def redis @redis end |
#workers ⇒ Object
Returns the value of attribute workers.
5 6 7 |
# File 'lib/mini_scheduler/manager.rb', line 5 def workers @workers end |
Class Method Details
.current ⇒ Object
248 249 250 |
# File 'lib/mini_scheduler/manager.rb', line 248 def self.current @current ||= {} end |
.discover_queues ⇒ Object
370 371 372 373 374 |
# File 'lib/mini_scheduler/manager.rb', line 370 def self.discover_queues queues = Set.new ObjectSpace.each_object(MiniScheduler::Schedule).each { |schedule| queues << schedule.queue } queues end |
.discover_running_scheduled_jobs ⇒ Array<Hash>
Discover running scheduled jobs on the current host.
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
# File 'lib/mini_scheduler/manager.rb', line 417 def self.discover_running_scheduled_jobs hostname = self.hostname schedule_keys = discover_schedules.reduce({}) do |acc, klass| acc[klass] = if klass.is_per_host self.schedule_key(klass, hostname) else self.schedule_key(klass) end acc end running_scheduled_jobs = [] schedule_keys .keys .zip(MiniScheduler.redis.mget(*schedule_keys.values)) .each do |scheduled_job_class, scheduled_job_info| next if scheduled_job_info.nil? parsed = begin JSON.parse(scheduled_job_info, symbolize_names: true) rescue JSON::ParserError nil end next if parsed.nil? next if parsed[:prev_result] != "RUNNING" running_scheduled_jobs << { class: scheduled_job_class, started_at: Time.at(parsed[:prev_run]), thread_id: parsed[:current_owner], } end running_scheduled_jobs end |
.discover_schedules ⇒ Object
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/mini_scheduler/manager.rb', line 376 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end |
.hostname ⇒ Object
392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/mini_scheduler/manager.rb', line 392 def self.hostname @hostname ||= begin require "socket" Socket.gethostname rescue => e begin `hostname`.strip rescue => e "unknown_host" end end end |
.lock_key(queue) ⇒ Object
476 477 478 |
# File 'lib/mini_scheduler/manager.rb', line 476 def self.lock_key(queue) "_scheduler_lock_#{queue}_" end |
.queue_key(queue, hostname = nil) ⇒ Object
480 481 482 |
# File 'lib/mini_scheduler/manager.rb', line 480 def self.queue_key(queue, hostname = nil) hostname ? "_scheduler_queue_#{queue}_#{hostname}_" : "_scheduler_queue_#{queue}_" end |
.schedule_key(klass, hostname = nil) ⇒ Object
484 485 486 |
# File 'lib/mini_scheduler/manager.rb', line 484 def self.schedule_key(klass, hostname = nil) hostname ? "_scheduler_#{klass}_#{hostname}" : "_scheduler_#{klass}" end |
.seq ⇒ Object
460 461 462 463 464 465 |
# File 'lib/mini_scheduler/manager.rb', line 460 def self.seq @class_mutex.synchronize do @i ||= 0 @i += 1 end end |
.without_runner ⇒ Object
224 225 226 |
# File 'lib/mini_scheduler/manager.rb', line 224 def self.without_runner self.new(skip_runner: true) end |
Instance Method Details
#blocking_tick ⇒ Object
345 346 347 348 |
# File 'lib/mini_scheduler/manager.rb', line 345 def blocking_tick tick @runner.wait_till_done end |
#ensure_schedule!(klass) ⇒ Object
264 265 266 |
# File 'lib/mini_scheduler/manager.rb', line 264 def ensure_schedule!(klass) lock { schedule_info(klass).schedule! } end |
#get_klass(name) ⇒ Object
296 297 298 299 300 |
# File 'lib/mini_scheduler/manager.rb', line 296 def get_klass(name) name.constantize rescue NameError nil end |
#hostname ⇒ Object
252 253 254 |
# File 'lib/mini_scheduler/manager.rb', line 252 def hostname @hostname ||= self.class.hostname end |
#identity_key ⇒ Object
468 469 470 471 472 473 474 |
# File 'lib/mini_scheduler/manager.rb', line 468 def identity_key return @identity_key if @identity_key @@identity_key_mutex.synchronize do @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end end |
#keep_alive(*ids) ⇒ Object
359 360 361 362 |
# File 'lib/mini_scheduler/manager.rb', line 359 def keep_alive(*ids) ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0 ids.each { |identity_key| redis.setex identity_key, keep_alive_duration, "" } end |
#keep_alive_duration ⇒ Object
355 356 357 |
# File 'lib/mini_scheduler/manager.rb', line 355 def keep_alive_duration 60 end |
#lock ⇒ Object
364 365 366 367 368 |
# File 'lib/mini_scheduler/manager.rb', line 364 def lock MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do yield end end |
#next_run(klass) ⇒ Object
260 261 262 |
# File 'lib/mini_scheduler/manager.rb', line 260 def next_run(klass) schedule_info(klass).next_run end |
#remove(klass) ⇒ Object
268 269 270 |
# File 'lib/mini_scheduler/manager.rb', line 268 def remove(klass) lock { schedule_info(klass).del! } end |
#repair_queue ⇒ Object
302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/mini_scheduler/manager.rb', line 302 def repair_queue if redis.exists?(self.class.queue_key(queue)) || redis.exists?(self.class.queue_key(queue, hostname)) return end self .class .discover_schedules .select { |schedule| schedule.queue == queue } .each { |schedule| ensure_schedule!(schedule) } end |
#reschedule_orphans! ⇒ Object
272 273 274 275 276 277 |
# File 'lib/mini_scheduler/manager.rb', line 272 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end |
#reschedule_orphans_on!(hostname = nil) ⇒ Object
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/mini_scheduler/manager.rb', line 279 def reschedule_orphans_on!(hostname = nil) redis .zrange(Manager.queue_key(queue, hostname), 0, -1) .each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if %w[QUEUED RUNNING].include?(info.prev_result) && (!info.current_owner || !redis.get(info.current_owner)) info.prev_result = "ORPHAN" info.next_run = Time.now.to_i info.write! end end end |
#schedule_info(klass) ⇒ Object
256 257 258 |
# File 'lib/mini_scheduler/manager.rb', line 256 def schedule_info(klass) MiniScheduler::ScheduleInfo.new(klass, self) end |
#schedule_next_job(hostname = nil) ⇒ Object
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/mini_scheduler/manager.rb', line 322 def schedule_next_job(hostname = nil) (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) if !klass || ((klass.is_per_host && !hostname) || (hostname && !klass.is_per_host)) # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(queue, hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end |
#stop! ⇒ Object
350 351 352 353 |
# File 'lib/mini_scheduler/manager.rb', line 350 def stop! @runner.stop! self.class.current.delete(@queue) end |
#tick ⇒ Object
315 316 317 318 319 320 |
# File 'lib/mini_scheduler/manager.rb', line 315 def tick lock do schedule_next_job schedule_next_job(hostname) end end |