Class: Resque::Scheduler
- Inherits:
-
Object
- Object
- Resque::Scheduler
- Extended by:
- Helpers
- Defined in:
- lib/resque/scheduler.rb
Class Attribute Summary collapse
-
.dynamic ⇒ Object
If set, will try to update the schulde in the loop.
-
.mute ⇒ Object
If set, produces no output.
-
.verbose ⇒ Object
If true, logs more stuff…
Class Method Summary collapse
-
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
-
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp.
-
.enqueue_from_config(config) ⇒ Object
Enqueues a job based on a config hash.
-
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
- .handle_shutdown ⇒ Object
-
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
-
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs.
- .log(msg) ⇒ Object
- .log!(msg) ⇒ Object
-
.poll_sleep ⇒ Object
Sleeps and returns true.
- .procline(string) ⇒ Object
-
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV.
-
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant).
- .reload_schedule! ⇒ Object
- .rufus_scheduler ⇒ Object
-
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns).
-
.scheduled_jobs ⇒ Object
the Rufus::Scheduler jobs that are scheduled.
-
.shutdown ⇒ Object
Sets the shutdown flag, exits if sleeping.
- .unschedule_job(name) ⇒ Object
- .update_schedule ⇒ Object
Class Attribute Details
.dynamic ⇒ Object
If set, will try to update the schulde in the loop
19 20 21 |
# File 'lib/resque/scheduler.rb', line 19 def dynamic @dynamic end |
.mute ⇒ Object
If set, produces no output
16 17 18 |
# File 'lib/resque/scheduler.rb', line 16 def mute @mute end |
.verbose ⇒ Object
If true, logs more stuff…
13 14 15 |
# File 'lib/resque/scheduler.rb', line 13 def verbose @verbose end |
Class Method Details
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
174 175 176 177 178 179 |
# File 'lib/resque/scheduler.rb', line 174 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @@scheduled_jobs = {} rufus_scheduler end |
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/resque/scheduler.rb', line 126 def () item = nil begin handle_shutdown do if item = Resque.() log "queuing #{item['class']} [delayed]" queue = item['queue'] || Resque.queue_from_class(constantize(item['class'])) # Support custom job classes like job with status if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job') # custom job classes not supporting the same API calls must implement the #schedule method constantize(job_klass).scheduled(queue, item['class'], *item['args']) else klass, args = item['class'], item['args'] Resque.enqueue(constantize(klass), *args) end end end # continue processing until there are no more ready items in this timestamp end while !item.nil? end |
.enqueue_from_config(config) ⇒ Object
Enqueues a job based on a config hash
154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/resque/scheduler.rb', line 154 def enqueue_from_config(config) args = config['args'] || config[:args] klass_name = config['class'] || config[:class] params = args.is_a?(Hash) ? [args] : Array(args) queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name)) # Support custom job classes like job with status if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job') # custom job classes not supporting the same API calls must implement the #schedule method constantize(job_klass).scheduled(queue, klass_name, *params) else Resque.enqueue(constantize(klass_name), *params) end end |
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/resque/scheduler.rb', line 114 def handle_delayed_items(at_time=nil) item = nil if = Resque.(at_time) procline "Processing Delayed Items" while !.nil? () = Resque.(at_time) end end end |
.handle_shutdown ⇒ Object
147 148 149 150 151 |
# File 'lib/resque/scheduler.rb', line 147 def handle_shutdown exit if @shutdown yield exit if @shutdown end |
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/resque/scheduler.rb', line 64 def load_schedule! log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? @@scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.schedules_changed.remove procline "Schedules Loaded" end |
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/resque/scheduler.rb', line 77 def load_schedule_job(name, config) # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) log! "Scheduling #{name} " interval_defined = false interval_types = %w{cron every} interval_types.each do |interval_type| if !config[interval_type].nil? && config[interval_type].length > 0 begin @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do log! "queueing #{config['class']} (#{name})" enqueue_from_config(config) end rescue Exception => e log! "#{e.class.name}: #{e.}" end interval_defined = true break end end unless interval_defined log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" end end end |
.log(msg) ⇒ Object
230 231 232 233 |
# File 'lib/resque/scheduler.rb', line 230 def log(msg) # add "verbose" logic later log!(msg) if verbose end |
.log!(msg) ⇒ Object
226 227 228 |
# File 'lib/resque/scheduler.rb', line 226 def log!(msg) puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{msg}" unless mute end |
.poll_sleep ⇒ Object
Sleeps and returns true
213 214 215 216 217 218 |
# File 'lib/resque/scheduler.rb', line 213 def poll_sleep @sleeping = true handle_shutdown { sleep 5 } @sleeping = false true end |
.procline(string) ⇒ Object
235 236 237 238 |
# File 'lib/resque/scheduler.rb', line 235 def procline(string) $0 = "resque-mongo-scheduler-#{ResqueScheduler::Version}: #{string}" log! $0 end |
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV
108 109 110 |
# File 'lib/resque/scheduler.rb', line 108 def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV']) end |
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.
49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/resque/scheduler.rb', line 49 def register_signal_handlers trap("TERM") { shutdown } trap("INT") { shutdown } begin trap('QUIT') { shutdown } trap('USR1') { kill_child } trap('USR2') { reload_schedule! } rescue ArgumentError warn "Signals QUIT and USR1 and USR2 not supported." end end |
.reload_schedule! ⇒ Object
181 182 183 184 185 186 |
# File 'lib/resque/scheduler.rb', line 181 def reload_schedule! procline "Reloading Schedule" clear_schedule! Resque.reload_schedule! load_schedule! end |
.rufus_scheduler ⇒ Object
168 169 170 |
# File 'lib/resque/scheduler.rb', line 168 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.start_new end |
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/resque/scheduler.rb', line 27 def run $0 = "resque-mongo-scheduler: Starting" # trap signals register_signal_handlers # Load the schedule into rufus procline "Loading Schedule" load_schedule! # Now start the scheduling part of the loop. loop do handle_delayed_items update_schedule if dynamic poll_sleep end # never gets here. end |
.scheduled_jobs ⇒ Object
the Rufus::Scheduler jobs that are scheduled
22 23 24 |
# File 'lib/resque/scheduler.rb', line 22 def scheduled_jobs @@scheduled_jobs end |
.shutdown ⇒ Object
Sets the shutdown flag, exits if sleeping
221 222 223 224 |
# File 'lib/resque/scheduler.rb', line 221 def shutdown @shutdown = true exit if @sleeping end |
.unschedule_job(name) ⇒ Object
204 205 206 207 208 209 210 |
# File 'lib/resque/scheduler.rb', line 204 def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @@scheduled_jobs.delete(name) end end |
.update_schedule ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/resque/scheduler.rb', line 188 def update_schedule if Resque.schedules_changed.count > 0 procline "Updating schedule" Resque.reload_schedule! Resque.pop_schedules_changed do |schedule_name| if Resque.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end procline "Schedules Loaded" end end |