Class: Scheduler::MainProcess
- Inherits:
-
Object
- Object
- Scheduler::MainProcess
- Defined in:
- lib/scheduler/main_process.rb
Instance Attribute Summary collapse
-
#job_class ⇒ Class
The class of the main job model.
-
#logger ⇒ String
A logger file.
-
#max_concurrent_jobs ⇒ Integer
Maximum number of concurent jobs.
-
#pid ⇒ Integer
Pid of the main process.
-
#polling_interval ⇒ Integer
How much time to wait before each iteration.
Instance Method Summary collapse
-
#initialize ⇒ Scheduler::MainProcess
constructor
Creates a MainProcess which keeps running and continuously checks if new jobs are queued.
-
#reschedule_running_jobs ⇒ nil
Reschedules currently running jobs.
-
#start_loop ⇒ nil
Main loop.
Constructor Details
#initialize ⇒ Scheduler::MainProcess
Creates a MainProcess which keeps running and continuously checks if new jobs are queued.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/scheduler/main_process.rb', line 20 def initialize Mongoid.load! Scheduler.configuration.mongoid_config_file, Scheduler.configuration.environment @logger = Scheduler.logger @pid = Process.pid @job_class = Scheduler.configuration.job_class @polling_interval = Scheduler.configuration.polling_interval @max_concurrent_jobs = Scheduler.configuration.max_concurrent_jobs if @polling_interval < 1 logger.warn Rainbow("[Scheduler:#{@pid}] Warning: specified a polling interval lesser than 1: "\ "it will be forced to 1.").yellow @polling_interval = 1 end unless @job_class.included_modules.include? Scheduler::Schedulable raise "The given job class '#{@job_class}' is not a Schedulable class. "\ "Make sure to add 'include Scheduler::Schedulable' to your class." end # Loads up a job queue. @queue = [] @logger.info Rainbow("[Scheduler:#{@pid}] Starting main loop..").cyan self.start_loop end |
Instance Attribute Details
#job_class ⇒ Class
Returns the class of the main job model.
9 10 11 |
# File 'lib/scheduler/main_process.rb', line 9 def job_class @job_class end |
#logger ⇒ String
Returns a logger file.
7 8 9 |
# File 'lib/scheduler/main_process.rb', line 7 def logger @logger end |
#max_concurrent_jobs ⇒ Integer
Returns maximum number of concurent jobs.
13 14 15 |
# File 'lib/scheduler/main_process.rb', line 13 def max_concurrent_jobs @max_concurrent_jobs end |
#pid ⇒ Integer
Returns pid of the main process.
5 6 7 |
# File 'lib/scheduler/main_process.rb', line 5 def pid @pid end |
#polling_interval ⇒ Integer
Returns how much time to wait before each iteration.
11 12 13 |
# File 'lib/scheduler/main_process.rb', line 11 def polling_interval @polling_interval end |
Instance Method Details
#reschedule_running_jobs ⇒ nil
Reschedules currently running jobs.
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/scheduler/main_process.rb', line 125 def reschedule_running_jobs @job_class.running.each do |job| begin Process.kill :QUIT, job.pid if job.pid.present? rescue Errno::ESRCH, Errno::EPERM ensure job.schedule end end end |
#start_loop ⇒ nil
Main loop.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/scheduler/main_process.rb', line 50 def start_loop loop do begin # Counts jobs to schedule. running_jobs = @job_class.running.entries scheduled_jobs = @job_class.queued.order_by(scheduled_at: :asc).entries performable_jobs = scheduled_jobs.select { |job| job.run_at <= Time.now } jobs_to_schedule = @max_concurrent_jobs - running_jobs.count jobs_to_schedule = 0 if jobs_to_schedule < 0 # Finds out scheduled jobs waiting to be performed. performed_jobs = [] performable_jobs.first(jobs_to_schedule).each do |job| job_pid = Process.fork do begin job.perform(Process.pid) rescue StandardError => e @logger.error Rainbow("[Scheduler:#{@pid}] Error #{e.class}: #{e.}.").red @logger.error Rainbow(e.backtrace.join("\n")).red end end Process.detach(job_pid) performed_jobs << job @queue << job.id.to_s end # Logs launched jobs if performed_jobs.any? @logger.info Rainbow("[Scheduler:#{@pid}] Launched #{performed_jobs.count} "\ "jobs: #{performed_jobs.map(&:id).map(&:to_s).join(', ')}.").cyan else if performable_jobs.count == 0 @logger.info Rainbow("[Scheduler:#{@pid}] No jobs launched, "\ "#{scheduled_jobs.count} in queue waiting to be performed.").cyan else @logger.warn Rainbow("[Scheduler:#{@pid}] No jobs launched, reached maximum "\ "number of concurrent jobs. Jobs in queue: #{performable_jobs.count}.").yellow end end # Checks for completed jobs: clears up queue and kills any zombie pid @queue.delete_if do |job_id| job = @job_class.find(job_id) if job.present? unless job.status.in? [ :queued, :running ] begin @logger.info Rainbow("[Scheduler:#{@pid}] Removed process #{job.pid}, job is completed.").cyan Process.kill :QUIT, job.pid rescue Errno::ENOENT, Errno::ESRCH end next true end end false end # Waits the specified amount of time before next iteration sleep @polling_interval rescue StandardError => error @logger.error Rainbow("[Scheduler:#{@pid}] Error #{error.class}: #{error.}").red @logger.error Rainbow(error.backtrace.join("\n")).red rescue SignalException => signal if signal..in? [ 'SIGINT', 'SIGTERM', 'SIGQUIT' ] @logger.warn Rainbow("[Scheduler:#{@pid}] Received interrupt, terminating scheduler..").yellow reschedule_running_jobs break end end end end |