Class: PerformEvery::Scheduler
- Inherits:
-
Object
- Object
- PerformEvery::Scheduler
- Defined in:
- lib/perform_every/scheduler.rb
Class Method Summary collapse
- .cleanup_deprecated_jobs ⇒ Object
-
.persist_jobs ⇒ Object
insert new jobs to database.
- .reset_jobs ⇒ Object
Instance Method Summary collapse
Class Method Details
.cleanup_deprecated_jobs ⇒ Object
92 93 94 |
# File 'lib/perform_every/scheduler.rb', line 92 def self.cleanup_deprecated_jobs Job.where(:deprecated => true).delete_all end |
.persist_jobs ⇒ Object
insert new jobs to database
82 83 84 85 86 87 88 89 90 |
# File 'lib/perform_every/scheduler.rb', line 82 def self.persist_jobs return 0 if Reflection.store.blank? Job.insert_all(Reflection.store.map{|j| { job_name: j.job_name, typ: j.typ, value: j.value, perform_at: j.perform_next_at} }) Reflection.store.count end |
.reset_jobs ⇒ Object
96 97 98 |
# File 'lib/perform_every/scheduler.rb', line 96 def self.reset_jobs Job.connection.truncate(Job.table_name) end |
Instance Method Details
#run_forever ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/perform_every/scheduler.rb', line 6 def run_forever Zeitwerk::Loader.eager_load_all # make sure all jobs are loaded # trap SIGINT and SIGTERM signals for clean shutdown kill = false Signal.trap("INT") {|s| kill = true } Signal.trap("TERM") {|s| kill = true } # try to continuously acquire advisory lock so that only one worker # at a time will schedule jobs. wait 5 seconds for lock, then try again after 30 seconds. loop do Rails.logger.info "Leader election: waiting to become master ..." ActiveRecord::Base.with_advisory_lock(ADVISORY_LOCK_NAME, timeout_seconds: 5) do Rails.logger.info "Leader election: I'm the master!" # persist new jobs in the database local_jobs_count = Scheduler.persist_jobs Rails.logger.info "Found #{local_jobs_count} job/s in local files" metrics = {} at_exit do Rails.logger.info "#{metrics}" unless metrics.blank? Rails.logger.info "Bye" end # start endless loop loop do Rails.logger.info "Running scheduler ..." # handle all jobs and schedule job if it's about time jobs = Job.where(:deprecated => false) jobs.each do |job| # check if job is still present in local job files if Reflection.store.include?(job) op = job.perform! metrics[op] ||= 0 metrics[op] += 1 else job.mark_deprecated! end return if kill end metrics[:total_jobs] = jobs.count Rails.logger.info "#{metrics}" metrics = {} if local_jobs_count > jobs.count + Job.where(:deprecated => true).count Rails.logger.warn "Unpersisted jobs found. Will retry to persist." Scheduler.persist_jobs end # go sleeping for SLEEP_INTERVAL and keep watching for kill commands (SLEEP_INTERVAL / 2).times do return if kill sleep 2 # seconds end end end # sleep for 30 seconds, keep watching for kill commands 15.times do return if kill sleep 2 end end # /loop around with_advisory_lock end |