Class: Delayed::Master::JobChecker

Inherits:
Object
  • Object
show all
Includes:
Sleep
Defined in:
lib/delayed/master/job_checker.rb

Instance Method Summary collapse

Methods included from Sleep

#loop_with_sleep

Constructor Details

#initialize(master) ⇒ JobChecker

Returns a new instance of JobChecker.



13
14
15
16
17
18
19
20
21
22
# File 'lib/delayed/master/job_checker.rb', line 13

def initialize(master)
  @master = master
  @config = master.config
  @databases = master.databases
  @callbacks = master.callbacks
  @queues = @databases.map { |database| [database, Queue.new] }.to_h
  @threads = SafeArray.new
  @timer_threads = SafeArray.new
  @job_finder = JobFinder.new(master)
end

Instance Method Details

#schedule(databases) ⇒ Object



75
76
77
78
79
80
# File 'lib/delayed/master/job_checker.rb', line 75

def schedule(databases)
  Array(databases).each do |database|
    queue = @queues[database]
    queue.push(database) if queue.size == 0
  end
end

#shutdownObject



87
88
89
90
# File 'lib/delayed/master/job_checker.rb', line 87

def shutdown
  @threads.each(&:kill)
  @timer_threads.each(&:kill)
end

#startObject



24
25
26
27
28
29
# File 'lib/delayed/master/job_checker.rb', line 24

def start
  @threads << start_scheduler_thread
  @threads += @databases.map do |database|
    start_checker_thread(database)
  end
end

#start_checker_thread(database) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/delayed/master/job_checker.rb', line 44

def start_checker_thread(database)
  Thread.new(database) do |database|
    loop do
      if @queues[database].pop == :stop
        break
      else
        @callbacks.call(:polling, @master, database) do
          check(database)
        end
      end
    end
  end
end

#start_scheduler_threadObject



31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/delayed/master/job_checker.rb', line 31

def start_scheduler_thread
  Thread.new do
    loop_with_sleep @config.polling_interval do |i|
      if @master.stop?
        stop
        break
      elsif i == 0
        schedule(@databases)
      end
    end
  end
end

#start_timer_thread(database, run_at) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/delayed/master/job_checker.rb', line 58

def start_timer_thread(database, run_at)
  @timer_threads << Thread.new(database, run_at) do |database, run_at|
    interval = run_at.to_f - Time.zone.now.to_f
    sleep interval if interval > 0
    schedule(database)
    @timer_threads.delete(Thread.current)
  end
end

#stopObject



67
68
69
70
71
72
73
# File 'lib/delayed/master/job_checker.rb', line 67

def stop
  @databases.each do |database|
    queue = @queues[database]
    queue.clear
    queue.push(:stop)
  end
end

#waitObject



82
83
84
85
# File 'lib/delayed/master/job_checker.rb', line 82

def wait
  @threads.each(&:join)
  @timer_threads.each(&:join)
end