Class: Delayed::Master::JobChecker
- Inherits:
-
Object
- Object
- Delayed::Master::JobChecker
- Includes:
- Sleep
- Defined in:
- lib/delayed/master/job_checker.rb
Instance Method Summary collapse
-
#initialize(master) ⇒ JobChecker
constructor
A new instance of JobChecker.
- #schedule(databases) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #start_checker_thread(database) ⇒ Object
- #start_scheduler_thread ⇒ Object
- #start_timer_thread(database, run_at) ⇒ Object
- #stop ⇒ Object
- #wait ⇒ Object
Methods included from Sleep
Constructor Details
#initialize(master) ⇒ JobChecker
Returns a new instance of JobChecker.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/delayed/master/job_checker.rb', line 13 def initialize(master) @master = master @config = master.config @databases = master.databases @callbacks = master.callbacks @queues = @databases.map { |database| [database, Queue.new] }.to_h @threads = SafeArray.new @timer_threads = SafeArray.new @job_finder = JobFinder.new(master) end |
Instance Method Details
#schedule(databases) ⇒ Object
75 76 77 78 79 80 |
# File 'lib/delayed/master/job_checker.rb', line 75 def schedule(databases) Array(databases).each do |database| queue = @queues[database] queue.push(database) if queue.size == 0 end end |
#shutdown ⇒ Object
87 88 89 90 |
# File 'lib/delayed/master/job_checker.rb', line 87 def shutdown @threads.each(&:kill) @timer_threads.each(&:kill) end |
#start ⇒ Object
24 25 26 27 28 29 |
# File 'lib/delayed/master/job_checker.rb', line 24 def start @threads << start_scheduler_thread @threads += @databases.map do |database| start_checker_thread(database) end end |
#start_checker_thread(database) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/delayed/master/job_checker.rb', line 44 def start_checker_thread(database) Thread.new(database) do |database| loop do if @queues[database].pop == :stop break else @callbacks.call(:polling, @master, database) do check(database) end end end end end |
#start_scheduler_thread ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/delayed/master/job_checker.rb', line 31 def start_scheduler_thread Thread.new do loop_with_sleep @config.polling_interval do |i| if @master.stop? stop break elsif i == 0 schedule(@databases) end end end end |
#start_timer_thread(database, run_at) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/delayed/master/job_checker.rb', line 58 def start_timer_thread(database, run_at) @timer_threads << Thread.new(database, run_at) do |database, run_at| interval = run_at.to_f - Time.zone.now.to_f sleep interval if interval > 0 schedule(database) @timer_threads.delete(Thread.current) end end |
#stop ⇒ Object
67 68 69 70 71 72 73 |
# File 'lib/delayed/master/job_checker.rb', line 67 def stop @databases.each do |database| queue = @queues[database] queue.clear queue.push(:stop) end end |
#wait ⇒ Object
82 83 84 85 |
# File 'lib/delayed/master/job_checker.rb', line 82 def wait @threads.each(&:join) @timer_threads.each(&:join) end |