Class: Resque::Scheduler
- Inherits:
-
Object
show all
- Extended by:
- SchedulerLocking
- Defined in:
- lib/resque/scheduler.rb,
lib/resque/scheduler/lock/base.rb,
lib/resque/scheduler/lock/basic.rb,
lib/resque/scheduler/lock/resilient.rb
Defined Under Namespace
Modules: Lock
Class Attribute Summary collapse
Class Method Summary
collapse
is_master?, master_lock, release_master_lock!, supports_lua?
Class Attribute Details
.app_name ⇒ Object
69
70
71
|
# File 'lib/resque/scheduler.rb', line 69
def app_name
@app_name ||= ENV['APP_NAME']
end
|
.dynamic ⇒ Object
62
63
64
|
# File 'lib/resque/scheduler.rb', line 62
def dynamic
@dynamic ||= !!ENV['DYNAMIC_SCHEDULE']
end
|
.env ⇒ Object
24
25
26
27
28
29
|
# File 'lib/resque/scheduler.rb', line 24
def env
return @env if @env
@env ||= Rails.env if defined?(Rails)
@env ||= ENV['RAILS_ENV']
@env
end
|
.logfile ⇒ Object
48
49
50
|
# File 'lib/resque/scheduler.rb', line 48
def logfile
@logfile ||= ENV['LOGFILE']
end
|
55
56
57
|
# File 'lib/resque/scheduler.rb', line 55
def logformat
@logformat ||= ENV['LOGFORMAT']
end
|
.logger ⇒ Object
84
85
86
87
88
89
90
91
|
# File 'lib/resque/scheduler.rb', line 84
def logger
@logger ||= ResqueScheduler::LoggerBuilder.new(
:mute => mute,
:verbose => verbose,
:log_dev => logfile,
:format => logformat
).build
end
|
.mute ⇒ Object
41
42
43
|
# File 'lib/resque/scheduler.rb', line 41
def mute
@mute ||= !!ENV['MUTE']
end
|
.poll_sleep_amount ⇒ Object
77
78
79
80
|
# File 'lib/resque/scheduler.rb', line 77
def poll_sleep_amount
@poll_sleep_amount ||=
Float(ENV.fetch('RESQUE_SCHEDULER_INTERVAL', '5'))
end
|
.signal_queue ⇒ Object
17
18
19
|
# File 'lib/resque/scheduler.rb', line 17
def signal_queue
@signal_queue ||= []
end
|
.verbose ⇒ Object
34
35
36
|
# File 'lib/resque/scheduler.rb', line 34
def verbose
@verbose ||= !!ENV['VERBOSE']
end
|
Class Method Details
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
346
347
348
349
350
351
|
# File 'lib/resque/scheduler.rb', line 346
def clear_schedule!
rufus_scheduler.stop
@rufus_scheduler = nil
@@scheduled_jobs = {}
rufus_scheduler
end
|
Allows for block-style configuration
11
12
13
|
# File 'lib/resque/scheduler.rb', line 11
def configure
yield self
end
|
.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object
Enqueues all delayed jobs for a timestamp
270
271
272
273
274
275
276
277
278
279
280
281
282
|
# File 'lib/resque/scheduler.rb', line 270
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
begin
handle_shutdown do
if is_master? && item = Resque.next_item_for_timestamp(timestamp)
log "queuing #{item['class']} [delayed]"
handle_errors { enqueue_from_config(item) }
end
end
end while !item.nil?
end
|
.enqueue_from_config(job_config) ⇒ Object
Enqueues a job based on a config hash
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
|
# File 'lib/resque/scheduler.rb', line 299
def enqueue_from_config(job_config)
args = job_config['args'] || job_config[:args]
klass_name = job_config['class'] || job_config[:class]
klass = ResqueScheduler::Util.constantize(klass_name) rescue klass_name
params = args.is_a?(Hash) ? [args] : Array(args)
queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass)
if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job')
begin
ResqueScheduler::Util.constantize(job_klass).scheduled(queue, klass_name, *params)
rescue NameError
Resque::Job.create(queue, job_klass, *params)
end
else
if Class === klass
ResqueScheduler::Plugin.run_before_delayed_enqueue_hooks(klass, *params)
if klass.respond_to?(:scheduled)
klass.scheduled(queue, klass_name, *params)
else
Resque.enqueue_to(queue, klass, *params)
end
else
Resque::Job.create(queue, klass, *params)
end
end
end
|
.env_matches?(configured_env) ⇒ Boolean
Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.
253
254
255
|
# File 'lib/resque/scheduler.rb', line 253
def env_matches?(configured_env)
env && configured_env.split(/[\s,]+/).include?(env)
end
|
.handle_delayed_items(at_time = nil) ⇒ Object
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
259
260
261
262
263
264
265
266
267
|
# File 'lib/resque/scheduler.rb', line 259
def handle_delayed_items(at_time=nil)
if timestamp = Resque.next_delayed_timestamp(at_time)
procline "Processing Delayed Items"
while !timestamp.nil?
enqueue_delayed_items_for_timestamp(timestamp)
timestamp = Resque.next_delayed_timestamp(at_time)
end
end
end
|
.handle_errors ⇒ Object
290
291
292
293
294
295
296
|
# File 'lib/resque/scheduler.rb', line 290
def handle_errors
begin
yield
rescue Exception => e
log_error "#{e.class.name}: #{e.message}"
end
end
|
.handle_shutdown ⇒ Object
284
285
286
287
288
|
# File 'lib/resque/scheduler.rb', line 284
def handle_shutdown
exit if @shutdown
yield
exit if @shutdown
end
|
.handle_signals ⇒ Object
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/resque/scheduler.rb', line 149
def handle_signals
loop do
sig = signal_queue.shift
break unless sig
log! "Got #{sig} signal"
case sig
when 'INT', 'TERM', 'QUIT' then shutdown
when 'USR1' then print_schedule
when 'USR2' then reload_schedule!
end
end
end
|
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# File 'lib/resque/scheduler.rb', line 174
def load_schedule!
procline "Loading Schedule"
Resque.reload_schedule! if dynamic
log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?
@@scheduled_jobs = {}
Resque.schedule.each do |name, config|
load_schedule_job(name, config)
end
Resque.redis.del(:schedules_changed)
procline "Schedules Loaded"
end
|
.load_schedule_job(name, config) ⇒ Object
Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
|
# File 'lib/resque/scheduler.rb', line 208
def load_schedule_job(name, config)
configured_env = config['rails_env'] || config['env']
if configured_env.nil? || env_matches?(configured_env)
log! "Scheduling #{name} "
interval_defined = false
interval_types = %w{cron every}
interval_types.each do |interval_type|
if !config[interval_type].nil? && config[interval_type].length > 0
args = optionizate_interval_value(config[interval_type])
@@scheduled_jobs[name] = rufus_scheduler.send(interval_type, *args) do
if is_master?
log! "queueing #{config['class']} (#{name})"
handle_errors { enqueue_from_config(config) }
end
end
interval_defined = true
break
end
end
unless interval_defined
log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
end
else
log "Skipping schedule of #{name} because configured " <<
"env #{configured_env.inspect} does not match current " <<
"env #{env.inspect}"
end
end
|
.log(msg) ⇒ Object
429
430
431
|
# File 'lib/resque/scheduler.rb', line 429
def log(msg)
logger.debug { msg }
end
|
.log!(msg) ⇒ Object
421
422
423
|
# File 'lib/resque/scheduler.rb', line 421
def log!(msg)
logger.info { msg }
end
|
.log_error(msg) ⇒ Object
425
426
427
|
# File 'lib/resque/scheduler.rb', line 425
def log_error(msg)
logger.error { msg }
end
|
.optionizate_interval_value(value) ⇒ Object
modify interval type value to value with options if options available
192
193
194
195
196
197
198
199
200
201
202
203
204
|
# File 'lib/resque/scheduler.rb', line 192
def optionizate_interval_value(value)
args = value
if args.is_a?(::Array)
return args.first if args.size > 2 || !args.last.is_a?(::Hash)
args[1] = args[1].inject({}) do |m, i|
key, value = i
m[(key.to_sym rescue key) || key] = value
m
end
end
args
end
|
.poll_sleep ⇒ Object
384
385
386
387
388
389
390
391
392
393
|
# File 'lib/resque/scheduler.rb', line 384
def poll_sleep
handle_shutdown do
begin
poll_sleep_loop
ensure
@sleeping = false
end
end
true
end
|
.poll_sleep_loop ⇒ Object
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
|
# File 'lib/resque/scheduler.rb', line 395
def poll_sleep_loop
@sleeping = true
start = Time.now
loop do
break if (Time.now - start) >= poll_sleep_amount
begin
sleep 0.01
handle_signals
rescue Interrupt
if @shutdown
Resque.clean_schedules
release_master_lock!
end
break
end
end
end
|
.print_schedule ⇒ Object
162
163
164
165
166
167
168
169
170
|
# File 'lib/resque/scheduler.rb', line 162
def print_schedule
if rufus_scheduler
log! "Scheduling Info\tLast Run"
scheduler_jobs = rufus_scheduler.all_jobs
scheduler_jobs.each do |k, v|
log! "#{v.t}\t#{v.last}\t"
end
end
end
|
.procline(string) ⇒ Object
433
434
435
436
437
438
|
# File 'lib/resque/scheduler.rb', line 433
def procline(string)
log! string
argv0 = build_procline(string)
log "Setting procline #{argv0.inspect}"
$0 = argv0
end
|
.rails_env_matches?(config) ⇒ Boolean
Returns true if the given schedule config hash matches the current env
244
245
246
247
248
249
|
# File 'lib/resque/scheduler.rb', line 244
def rails_env_matches?(config)
warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' <<
'Please use `Resque::Scheduler.env_matches?` instead.'
config['rails_env'] && env &&
config['rails_env'].split(/[\s,]+/).include?(env)
end
|
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost instant). In the case of sleeping, exit immediately.
143
144
145
146
147
|
# File 'lib/resque/scheduler.rb', line 143
def register_signal_handlers
%w(INT TERM USR1 USR2 QUIT).each do |sig|
trap(sig) { signal_queue << sig }
end
end
|
.reload_schedule! ⇒ Object
353
354
355
356
357
|
# File 'lib/resque/scheduler.rb', line 353
def reload_schedule!
procline "Reloading Schedule"
clear_schedule!
load_schedule!
end
|
.rufus_scheduler ⇒ Object
340
341
342
|
# File 'lib/resque/scheduler.rb', line 340
def rufus_scheduler
@rufus_scheduler ||= Rufus::Scheduler.start_new
end
|
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns)
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/resque/scheduler.rb', line 99
def run
$0 = "resque-scheduler: Starting"
register_signal_handlers
$stdout.sync = true
$stderr.sync = true
if dynamic
reload_schedule!
else
load_schedule!
end
begin
@th = Thread.current
loop do
if is_master?
begin
handle_delayed_items
update_schedule if dynamic
rescue Errno::EAGAIN, Errno::ECONNRESET => e
log! e.message
end
end
poll_sleep
end
rescue Interrupt
log 'Exiting'
end
end
|
.scheduled_jobs ⇒ Object
the Rufus::Scheduler jobs that are scheduled
94
95
96
|
# File 'lib/resque/scheduler.rb', line 94
def scheduled_jobs
@@scheduled_jobs
end
|
.shutdown ⇒ Object
Sets the shutdown flag, clean schedules and exits if sleeping
414
415
416
417
418
419
|
# File 'lib/resque/scheduler.rb', line 414
def shutdown
return if @shutdown
@shutdown = true
log!('Shutting down')
@th.raise Interrupt if @sleeping
end
|
.unschedule_job(name) ⇒ Object
375
376
377
378
379
380
381
|
# File 'lib/resque/scheduler.rb', line 375
def unschedule_job(name)
if scheduled_jobs[name]
log "Removing schedule #{name}"
scheduled_jobs[name].unschedule
@@scheduled_jobs.delete(name)
end
end
|
.update_schedule ⇒ Object
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
# File 'lib/resque/scheduler.rb', line 359
def update_schedule
if Resque.redis.scard(:schedules_changed) > 0
procline "Updating schedule"
Resque.reload_schedule!
while schedule_name = Resque.redis.spop(:schedules_changed)
if Resque.schedule.keys.include?(schedule_name)
unschedule_job(schedule_name)
load_schedule_job(schedule_name, Resque.schedule[schedule_name])
else
unschedule_job(schedule_name)
end
end
procline "Schedules Loaded"
end
end
|