Class: MiniScheduler::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/mini_scheduler/manager.rb

Defined Under Namespace

Classes: Runner

Constant Summary collapse

@@identity_key_mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ Manager

Returns a new instance of Manager.



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/mini_scheduler/manager.rb', line 228

def initialize(options = nil)
  @queue = options && options[:queue] || "default"
  @workers = options && options[:workers] || 1
  @redis = MiniScheduler.redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current[@queue] = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex

  if options && options.key?(:enable_stats)
    @enable_stats = options[:enable_stats]
  else
    @enable_stats = !!defined?(MiniScheduler::Stat)
  end
end

Instance Attribute Details

#enable_statsObject

Returns the value of attribute enable_stats.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def enable_stats
  @enable_stats
end

#queueObject

Returns the value of attribute queue.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def queue
  @queue
end

#random_ratioObject

Returns the value of attribute random_ratio.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def random_ratio
  @random_ratio
end

#redisObject

Returns the value of attribute redis.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def redis
  @redis
end

#workersObject

Returns the value of attribute workers.



5
6
7
# File 'lib/mini_scheduler/manager.rb', line 5

def workers
  @workers
end

Class Method Details

.currentObject



248
249
250
# File 'lib/mini_scheduler/manager.rb', line 248

def self.current
  @current ||= {}
end

.discover_queuesObject



370
371
372
373
374
# File 'lib/mini_scheduler/manager.rb', line 370

def self.discover_queues
  queues = Set.new
  ObjectSpace.each_object(MiniScheduler::Schedule).each { |schedule| queues << schedule.queue }
  queues
end

.discover_running_scheduled_jobsArray<Hash>

Discover running scheduled jobs on the current host.

Examples:


MiniScheduler::Manager.discover_running_scheduled_jobs

Parameters:

  • job (Hash)

    a customizable set of options

Returns:

  • (Array<Hash>)

    an array of hashes representing the running scheduled jobs.



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
# File 'lib/mini_scheduler/manager.rb', line 417

def self.discover_running_scheduled_jobs
  hostname = self.hostname

  schedule_keys =
    discover_schedules.reduce({}) do |acc, klass|
      acc[klass] = if klass.is_per_host
        self.schedule_key(klass, hostname)
      else
        self.schedule_key(klass)
      end

      acc
    end

  running_scheduled_jobs = []

  schedule_keys
    .keys
    .zip(MiniScheduler.redis.mget(*schedule_keys.values))
    .each do |scheduled_job_class, scheduled_job_info|
      next if scheduled_job_info.nil?

      parsed =
        begin
          JSON.parse(scheduled_job_info, symbolize_names: true)
        rescue JSON::ParserError
          nil
        end

      next if parsed.nil?
      next if parsed[:prev_result] != "RUNNING"

      running_scheduled_jobs << {
        class: scheduled_job_class,
        started_at: Time.at(parsed[:prev_run]),
        thread_id: parsed[:current_owner],
      }
    end

  running_scheduled_jobs
end

.discover_schedulesObject



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/mini_scheduler/manager.rb', line 376

def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end

.hostnameObject



392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/mini_scheduler/manager.rb', line 392

def self.hostname
  @hostname ||=
    begin
      require "socket"
      Socket.gethostname
    rescue => e
      begin
        `hostname`.strip
      rescue => e
        "unknown_host"
      end
    end
end

.lock_key(queue) ⇒ Object



476
477
478
# File 'lib/mini_scheduler/manager.rb', line 476

def self.lock_key(queue)
  "_scheduler_lock_#{queue}_"
end

.queue_key(queue, hostname = nil) ⇒ Object



480
481
482
# File 'lib/mini_scheduler/manager.rb', line 480

def self.queue_key(queue, hostname = nil)
  hostname ? "_scheduler_queue_#{queue}_#{hostname}_" : "_scheduler_queue_#{queue}_"
end

.schedule_key(klass, hostname = nil) ⇒ Object



484
485
486
# File 'lib/mini_scheduler/manager.rb', line 484

def self.schedule_key(klass, hostname = nil)
  hostname ? "_scheduler_#{klass}_#{hostname}" : "_scheduler_#{klass}"
end

.seqObject



460
461
462
463
464
465
# File 'lib/mini_scheduler/manager.rb', line 460

def self.seq
  @class_mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end

.without_runnerObject



224
225
226
# File 'lib/mini_scheduler/manager.rb', line 224

def self.without_runner
  self.new(skip_runner: true)
end

Instance Method Details

#blocking_tickObject



345
346
347
348
# File 'lib/mini_scheduler/manager.rb', line 345

def blocking_tick
  tick
  @runner.wait_till_done
end

#ensure_schedule!(klass) ⇒ Object



264
265
266
# File 'lib/mini_scheduler/manager.rb', line 264

def ensure_schedule!(klass)
  lock { schedule_info(klass).schedule! }
end

#get_klass(name) ⇒ Object



296
297
298
299
300
# File 'lib/mini_scheduler/manager.rb', line 296

def get_klass(name)
  name.constantize
rescue NameError
  nil
end

#hostnameObject



252
253
254
# File 'lib/mini_scheduler/manager.rb', line 252

def hostname
  @hostname ||= self.class.hostname
end

#identity_keyObject



468
469
470
471
472
473
474
# File 'lib/mini_scheduler/manager.rb', line 468

def identity_key
  return @identity_key if @identity_key
  @@identity_key_mutex.synchronize do
    @identity_key ||=
      "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
  end
end

#keep_alive(*ids) ⇒ Object



359
360
361
362
# File 'lib/mini_scheduler/manager.rb', line 359

def keep_alive(*ids)
  ids = [identity_key, *@runner.worker_thread_ids] if ids.size == 0
  ids.each { |identity_key| redis.setex identity_key, keep_alive_duration, "" }
end

#keep_alive_durationObject



355
356
357
# File 'lib/mini_scheduler/manager.rb', line 355

def keep_alive_duration
  60
end

#lockObject



364
365
366
367
368
# File 'lib/mini_scheduler/manager.rb', line 364

def lock
  MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
    yield
  end
end

#next_run(klass) ⇒ Object



260
261
262
# File 'lib/mini_scheduler/manager.rb', line 260

def next_run(klass)
  schedule_info(klass).next_run
end

#remove(klass) ⇒ Object



268
269
270
# File 'lib/mini_scheduler/manager.rb', line 268

def remove(klass)
  lock { schedule_info(klass).del! }
end

#repair_queueObject



302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/mini_scheduler/manager.rb', line 302

def repair_queue
  if redis.exists?(self.class.queue_key(queue)) ||
       redis.exists?(self.class.queue_key(queue, hostname))
    return
  end

  self
    .class
    .discover_schedules
    .select { |schedule| schedule.queue == queue }
    .each { |schedule| ensure_schedule!(schedule) }
end

#reschedule_orphans!Object



272
273
274
275
276
277
# File 'lib/mini_scheduler/manager.rb', line 272

def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end

#reschedule_orphans_on!(hostname = nil) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/mini_scheduler/manager.rb', line 279

def reschedule_orphans_on!(hostname = nil)
  redis
    .zrange(Manager.queue_key(queue, hostname), 0, -1)
    .each do |key|
      klass = get_klass(key)
      next unless klass
      info = schedule_info(klass)

      if %w[QUEUED RUNNING].include?(info.prev_result) &&
           (!info.current_owner || !redis.get(info.current_owner))
        info.prev_result = "ORPHAN"
        info.next_run = Time.now.to_i
        info.write!
      end
    end
end

#schedule_info(klass) ⇒ Object



256
257
258
# File 'lib/mini_scheduler/manager.rb', line 256

def schedule_info(klass)
  MiniScheduler::ScheduleInfo.new(klass, self)
end

#schedule_next_job(hostname = nil) ⇒ Object



322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/mini_scheduler/manager.rb', line 322

def schedule_next_job(hostname = nil)
  (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
  return unless key

  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    if !klass || ((klass.is_per_host && !hostname) || (hostname && !klass.is_per_host))
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(queue, hostname), key
      return
    end

    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end

#stop!Object



350
351
352
353
# File 'lib/mini_scheduler/manager.rb', line 350

def stop!
  @runner.stop!
  self.class.current.delete(@queue)
end

#tickObject



315
316
317
318
319
320
# File 'lib/mini_scheduler/manager.rb', line 315

def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end