Class: Scheduler::MainProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/scheduler/main_process.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeScheduler::MainProcess

Creates a MainProcess which keeps running and continuously checks if new jobs are queued.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/scheduler/main_process.rb', line 20

def initialize
  Mongoid.load! Scheduler.configuration.mongoid_config_file, Scheduler.configuration.environment
  @logger = Scheduler.logger
  @pid = Process.pid
  @job_class = Scheduler.configuration.job_class
  @polling_interval = Scheduler.configuration.polling_interval
  @max_concurrent_jobs = Scheduler.configuration.max_concurrent_jobs

  if @polling_interval < 1
    logger.warn Rainbow("[Scheduler:#{@pid}] Warning: specified a polling interval lesser than 1: "\
      "it will be forced to 1.").yellow
    @polling_interval = 1
  end
  
  unless @job_class.included_modules.include? Scheduler::Schedulable
    raise "The given job class '#{@job_class}' is not a Schedulable class. "\
      "Make sure to add 'include Scheduler::Schedulable' to your class."
  end

  # Loads up a job queue.
  @queue = []

  @logger.info Rainbow("[Scheduler:#{@pid}] Starting main loop..").cyan
  self.start_loop
end

Instance Attribute Details

#job_classClass

Returns the class of the main job model.

Returns:

  • (Class)

    the class of the main job model.



9
10
11
# File 'lib/scheduler/main_process.rb', line 9

def job_class
  @job_class
end

#loggerString

Returns a logger file.

Returns:

  • (String)

    a logger file.



7
8
9
# File 'lib/scheduler/main_process.rb', line 7

def logger
  @logger
end

#max_concurrent_jobsInteger

Returns maximum number of concurent jobs.

Returns:

  • (Integer)

    maximum number of concurent jobs.



13
14
15
# File 'lib/scheduler/main_process.rb', line 13

def max_concurrent_jobs
  @max_concurrent_jobs
end

#pidInteger

Returns pid of the main process.

Returns:

  • (Integer)

    pid of the main process.



5
6
7
# File 'lib/scheduler/main_process.rb', line 5

def pid
  @pid
end

#polling_intervalInteger

Returns how much time to wait before each iteration.

Returns:

  • (Integer)

    how much time to wait before each iteration.



11
12
13
# File 'lib/scheduler/main_process.rb', line 11

def polling_interval
  @polling_interval
end

Instance Method Details

#reschedule_running_jobsnil

Reschedules currently running jobs.

Returns:

  • (nil)


125
126
127
128
129
130
131
132
133
134
# File 'lib/scheduler/main_process.rb', line 125

def reschedule_running_jobs
  @job_class.running.each do |job|
    begin
      Process.kill :QUIT, job.pid if job.pid.present?
    rescue Errno::ESRCH, Errno::EPERM
    ensure
      job.schedule
    end
  end
end

#start_loopnil

Main loop.

Returns:

  • (nil)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/scheduler/main_process.rb', line 50

def start_loop
  loop do
    begin
      # Counts jobs to schedule.
      running_jobs = @job_class.running.entries
      scheduled_jobs = @job_class.queued.order_by(scheduled_at: :asc).entries
      performable_jobs = scheduled_jobs.select { |job| job.run_at <= Time.now }
      jobs_to_schedule = @max_concurrent_jobs - running_jobs.count
      jobs_to_schedule = 0 if jobs_to_schedule < 0
  
      # Finds out scheduled jobs waiting to be performed.
      performed_jobs = []
      performable_jobs.first(jobs_to_schedule).each do |job|
        job_pid = Process.fork do
          begin
            job.perform(Process.pid)
          rescue StandardError => e
            @logger.error Rainbow("[Scheduler:#{@pid}] Error #{e.class}: #{e.message}.").red
            @logger.error Rainbow(e.backtrace.join("\n")).red
          end
        end
        Process.detach(job_pid)
        performed_jobs << job
        @queue << job.id.to_s
      end
  
      # Logs launched jobs
      if performed_jobs.any?
        @logger.info Rainbow("[Scheduler:#{@pid}] Launched #{performed_jobs.count} "\
          "jobs: #{performed_jobs.map(&:id).map(&:to_s).join(', ')}.").cyan
      else
        if performable_jobs.count == 0
          @logger.info Rainbow("[Scheduler:#{@pid}] No jobs launched, "\
            "#{scheduled_jobs.count} in queue waiting to be performed.").cyan
        else
          @logger.warn Rainbow("[Scheduler:#{@pid}] No jobs launched, reached maximum "\
            "number of concurrent jobs. Jobs in queue: #{performable_jobs.count}.").yellow
        end
      end
  
      # Checks for completed jobs: clears up queue and kills any zombie pid
      @queue.delete_if do |job_id|
        job = @job_class.find(job_id)
        if job.present?
          unless job.status.in? [ :queued, :running ]
            begin
              @logger.info Rainbow("[Scheduler:#{@pid}] Removed process #{job.pid}, job is completed.").cyan
              Process.kill :QUIT, job.pid
            rescue Errno::ENOENT, Errno::ESRCH
            end
            next true
          end
        end
        false
      end
  
      # Waits the specified amount of time before next iteration
      sleep @polling_interval
    rescue StandardError => error
      @logger.error Rainbow("[Scheduler:#{@pid}] Error #{error.class}: #{error.message}").red
      @logger.error Rainbow(error.backtrace.join("\n")).red
    rescue SignalException => signal
      if signal.message.in? [ 'SIGINT', 'SIGTERM', 'SIGQUIT' ]
        @logger.warn Rainbow("[Scheduler:#{@pid}] Received interrupt, terminating scheduler..").yellow
        reschedule_running_jobs
        break
      end
    end
  end
end