Class: PerformEvery::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/perform_every/scheduler.rb

Instance Method Summary collapse

Instance Method Details

#run_foreverObject



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/perform_every/scheduler.rb', line 5

def run_forever
  Zeitwerk::Loader.eager_load_all # make sure all jobs are loaded

  # trap SIGINT and SIGTERM signals for clean shutdown
  kill = false
  Signal.trap("INT") { |s| kill = true }
  Signal.trap("TERM") { |s| kill = true }

  # try to continuously acquire advisory lock so that only one worker
  # at a time will schedule jobs. wait 5 seconds for lock, then try again after 30 seconds.
  loop do
    Rails.logger.info "Leader election: waiting to become master ..."
    ActiveRecord::Base.with_advisory_lock(ADVISORY_LOCK_NAME, timeout_seconds: 5) do
      Rails.logger.info "Leader election: I'm the master!"

      # persist new jobs in the database
      local_jobs_count = Scheduler.persist_jobs
      Rails.logger.info "Found #{local_jobs_count} job/s in local files"

      metrics = {}

      at_exit do
        Rails.logger.info "#{metrics}" unless metrics.blank?
        Rails.logger.info "Bye"
      end

      # start endless loop
      loop do
        Rails.logger.info "Running scheduler ..."

        # handle all jobs and schedule job if it's about time
        jobs = Job.where(:deprecated => false)
        jobs.each do |job|

          # check if job is still present in local job files
          if Reflection.store.include?(job)
            op = job.perform!
            metrics[op] ||= 0
            metrics[op] += 1
          else
            job.mark_deprecated!
          end

          return if kill
        end

        metrics[:total_jobs] = jobs.count
        Rails.logger.info "#{metrics}"
        metrics = {}

        if local_jobs_count > jobs.count + Job.where(:deprecated => true).count
          Rails.logger.warn "Unpersisted jobs found. Will retry to persist."
          Scheduler.persist_jobs
        end

        # go sleeping for SLEEP_INTERVAL and keep watching for kill commands
        (SLEEP_INTERVAL / 2).times do
          return if kill
          sleep 2 # seconds
        end
      end
    end

    # sleep for 30 seconds, keep watching for kill commands
    15.times do
      return if kill
      sleep 2
    end
  end # /loop around with_advisory_lock
end