Class: Resque::Scheduler
- Inherits:
-
Object
- Object
- Resque::Scheduler
- Extended by:
- Helpers, SchedulerLocking
- Defined in:
- lib/resque/scheduler.rb
Class Attribute Summary collapse
-
.dynamic ⇒ Object
If set, will try to update the schulde in the loop.
-
.mute ⇒ Object
If set, produces no output.
- .poll_sleep_amount ⇒ Object
-
.verbose ⇒ Object
If true, logs more stuff…
Class Method Summary collapse
-
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
-
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp.
-
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash.
-
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
- .handle_errors ⇒ Object
- .handle_shutdown ⇒ Object
-
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
-
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs.
- .log(msg) ⇒ Object
- .log!(msg) ⇒ Object
-
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available.
-
.poll_sleep ⇒ Object
Sleeps and returns true.
- .print_schedule ⇒ Object
- .procline(string) ⇒ Object
-
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV.
-
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant).
- .reload_schedule! ⇒ Object
- .rufus_scheduler ⇒ Object
-
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns).
-
.scheduled_jobs ⇒ Object
the Rufus::Scheduler jobs that are scheduled.
-
.shutdown ⇒ Object
Sets the shutdown flag, exits if sleeping.
- .unschedule_job(name) ⇒ Object
- .update_schedule ⇒ Object
Methods included from SchedulerLocking
acquire_master_lock!, extend_lock!, has_master_lock?, hostname, is_master?, lock_timeout, lock_timeout=, master_lock_key, master_lock_value, process_id
Class Attribute Details
.dynamic ⇒ Object
If set, will try to update the schulde in the loop
21 22 23 |
# File 'lib/resque/scheduler.rb', line 21 def dynamic @dynamic end |
.mute ⇒ Object
If set, produces no output
18 19 20 |
# File 'lib/resque/scheduler.rb', line 18 def mute @mute end |
.poll_sleep_amount ⇒ Object
32 33 34 |
# File 'lib/resque/scheduler.rb', line 32 def poll_sleep_amount @poll_sleep_amount ||= 5 # seconds end |
.verbose ⇒ Object
If true, logs more stuff…
15 16 17 |
# File 'lib/resque/scheduler.rb', line 15 def verbose @verbose end |
Class Method Details
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
244 245 246 247 248 249 |
# File 'lib/resque/scheduler.rb', line 244 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @@scheduled_jobs = {} rufus_scheduler end |
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp
175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/resque/scheduler.rb', line 175 def () item = nil begin handle_shutdown do # Continually check that it is still the master if is_master? && item = Resque.() log "queuing #{item['class']} [delayed]" handle_errors { enqueue_from_config(item) } end end # continue processing until there are no more ready items in this timestamp end while !item.nil? end |
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/resque/scheduler.rb', line 204 def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] klass_name = job_config['class'] || job_config[:class] klass = constantize(klass_name) rescue klass_name params = args.is_a?(Hash) ? [args] : Array(args) queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) # Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status) if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job') # The custom job class API must offer a static "scheduled" method. If the custom # job class can not be constantized (via a requeue call from the web perhaps), fall # back to enqueing normally via Resque::Job.create. begin constantize(job_klass).scheduled(queue, klass_name, *params) rescue NameError # Note that the custom job class (job_config['custom_job_class']) is the one enqueued Resque::Job.create(queue, job_klass, *params) end else # hack to avoid havoc for people shoving stuff into queues # for non-existent classes (for example: running scheduler in # one app that schedules for another if Class === klass ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks(klass, *params) Resque.enqueue_to(queue, klass, *params) else # This will not run the before_hooks in rescue, but will at least # queue the job. Resque::Job.create(queue, klass, *params) end end end |
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
164 165 166 167 168 169 170 171 172 |
# File 'lib/resque/scheduler.rb', line 164 def handle_delayed_items(at_time=nil) if = Resque.(at_time) procline "Processing Delayed Items" while !.nil? () = Resque.(at_time) end end end |
.handle_errors ⇒ Object
195 196 197 198 199 200 201 |
# File 'lib/resque/scheduler.rb', line 195 def handle_errors begin yield rescue Exception => e log! "#{e.class.name}: #{e.}" end end |
.handle_shutdown ⇒ Object
189 190 191 192 193 |
# File 'lib/resque/scheduler.rb', line 189 def handle_shutdown exit if @shutdown yield exit if @shutdown end |
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/resque/scheduler.rb', line 95 def load_schedule! procline "Loading Schedule" # Need to load the schedule from redis for the first time if dynamic Resque.reload_schedule! if dynamic log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? @@scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) procline "Schedules Loaded" end |
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/resque/scheduler.rb', line 128 def load_schedule_job(name, config) # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) log! "Scheduling #{name} " interval_defined = false interval_types = %w{cron every} interval_types.each do |interval_type| if !config[interval_type].nil? && config[interval_type].length > 0 args = optionizate_interval_value(config[interval_type]) @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, *args) do if is_master? log! "queueing #{config['class']} (#{name})" handle_errors { enqueue_from_config(config) } end end interval_defined = true break end end unless interval_defined log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" end end end |
.log(msg) ⇒ Object
299 300 301 302 |
# File 'lib/resque/scheduler.rb', line 299 def log(msg) # add "verbose" logic later log!(msg) if verbose end |
.log!(msg) ⇒ Object
295 296 297 |
# File 'lib/resque/scheduler.rb', line 295 def log!(msg) puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{msg}" unless mute end |
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available
113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/resque/scheduler.rb', line 113 def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options args[1] = args[1].inject({}) do |m, i| key, value = i m[(key.to_sym rescue key) || key] = value m end end args end |
.poll_sleep ⇒ Object
Sleeps and returns true
282 283 284 285 286 287 |
# File 'lib/resque/scheduler.rb', line 282 def poll_sleep @sleeping = true handle_shutdown { sleep poll_sleep_amount } @sleeping = false true end |
.print_schedule ⇒ Object
83 84 85 86 87 88 89 90 91 |
# File 'lib/resque/scheduler.rb', line 83 def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.all_jobs scheduler_jobs.each do |k, v| log! "#{v.t}\t#{v.last}\t" end end end |
.procline(string) ⇒ Object
304 305 306 307 |
# File 'lib/resque/scheduler.rb', line 304 def procline(string) log! string $0 = "resque-scheduler-#{ResqueScheduler::VERSION}: #{string}" end |
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current ENV
158 159 160 |
# File 'lib/resque/scheduler.rb', line 158 def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV']) end |
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/resque/scheduler.rb', line 70 def register_signal_handlers trap("TERM") { shutdown } trap("INT") { shutdown } begin trap('QUIT') { shutdown } trap('USR1') { print_schedule } trap('USR2') { reload_schedule! } rescue ArgumentError warn "Signals QUIT and USR1 and USR2 not supported." end end |
.reload_schedule! ⇒ Object
251 252 253 254 255 |
# File 'lib/resque/scheduler.rb', line 251 def reload_schedule! procline "Reloading Schedule" clear_schedule! load_schedule! end |
.rufus_scheduler ⇒ Object
238 239 240 |
# File 'lib/resque/scheduler.rb', line 238 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.start_new end |
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns)
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/resque/scheduler.rb', line 37 def run $0 = "resque-scheduler: Starting" # trap signals register_signal_handlers # Load the schedule into rufus # If dynamic is set, load that schedule otherwise use normal load if dynamic reload_schedule! else load_schedule! end # Now start the scheduling part of the loop. loop do if is_master? begin handle_delayed_items update_schedule if dynamic rescue Errno::EAGAIN, Errno::ECONNRESET => e warn e. end end poll_sleep end # never gets here. end |
.scheduled_jobs ⇒ Object
the Rufus::Scheduler jobs that are scheduled
28 29 30 |
# File 'lib/resque/scheduler.rb', line 28 def scheduled_jobs @@scheduled_jobs end |
.shutdown ⇒ Object
Sets the shutdown flag, exits if sleeping
290 291 292 293 |
# File 'lib/resque/scheduler.rb', line 290 def shutdown @shutdown = true exit if @sleeping end |
.unschedule_job(name) ⇒ Object
273 274 275 276 277 278 279 |
# File 'lib/resque/scheduler.rb', line 273 def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @@scheduled_jobs.delete(name) end end |
.update_schedule ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/resque/scheduler.rb', line 257 def update_schedule if Resque.redis.scard(:schedules_changed) > 0 procline "Updating schedule" Resque.reload_schedule! while schedule_name = Resque.redis.spop(:schedules_changed) if Resque.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end procline "Schedules Loaded" end end |