Class: OpenWFE::Scheduler
- Inherits:
-
Object
- Object
- OpenWFE::Scheduler
- Includes:
- MonitorMixin
- Defined in:
- lib/openwfe/util/scheduler.rb
Overview
The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs. ‘at’ jobs to execute once at a given point in time. ‘cron’ jobs execute a specified intervals. The two main methods are thus schedule_at() and schedule().
schedule_at() and schedule() await either a Schedulable instance and params (usually an array or nil), either a block, which is more in the Ruby way.
Examples
scheduler.schedule_in("3d") do
regenerate_monthly_report()
end
#
# will call the regenerate_monthly_report method
# in 3 days from now
scheduler.schedule "0 22 * * 1-5" do
log.info "activating security system..."
activate_security_system()
end
job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
init_self_destruction_sequence()
end
an example that uses a Schedulable class :
class Regenerator < Schedulable
def trigger (frequency)
self.send(frequency)
end
def monthly
# ...
end
def yearly
# ...
end
end
regenerator = Regenerator.new
scheduler.schedule_in("4d", regenerator)
#
# will regenerate the report in four days
scheduler.schedule_in(
"5d",
{ :schedulable => regenerator, :scope => :month })
#
# will regenerate the monthly report in 5 days
There is also schedule_every() :
scheduler.schedule_every("1h20m") do
regenerate_latest_report()
end
The scheduler has a “exit_when_no_more_jobs” attribute. When set to ‘true’, the scheduler will exit as soon as there are no more jobs to run. Use with care though, if you create a scheduler, set this attribute to true and start the scheduler, the scheduler will immediately exit. This attribute is best used indirectly : the method join_until_no_more_jobs() wraps it.
The :scheduler_precision can be set when instantiating the scheduler.
scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500)
scheduler.start
#
# instatiates a scheduler that checks its jobs twice per second
# (the default is 4 times per second (0.250))
Tags
Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :
scheduler.schedule_in "2h", :tags => "backup" do
init_backup_sequence()
end
scheduler.schedule "0 24 * * *", :tags => "new_day" do
do_this_or_that()
end
jobs = find_jobs 'backup'
jobs.each { |job| job.unschedule }
Multiple tags may be attached to a single job :
scheduler.schedule_in "2h", :tags => [ "backup", "important" ] do
init_backup_sequence()
end
The vanilla case for tags assume they are String instances, but nothing prevents you from using anything else. The scheduler has no persistence by itself, so no serialization issue.
Cron up to the second
Since OpenWFEru 0.9.16, a cron schedule can be set at the second level :
scheduler.schedule "7 * * * * *" do
puts "it's now the seventh second of the minute"
end
The OpenWFEru scheduler recognizes an optional first column for second scheduling. This column can, like for the other columns, specify a value (“7”), a list of values (“7,8,9,27”) or a range (“7-12”).
Exceptions
The OpenWFEru scheduler will output a stacktrace to the STDOUT in case of exception. There are two ways to change that behaviour.
# 1 - providing a lwarn method to the scheduler instance :
class << scheduler
def lwarn (&block)
puts "oops, something wrong happened : "
puts block.call
end
end
# 2 - overriding the [protected] method log_exception(e) :
class << scheduler
def log_exception (e)
puts "something wrong happened : "+e.to_s
end
end
‘Every jobs’ and rescheduling
Every jobs can reschedule/unschedule themselves. A reschedule example :
schedule.schedule_every "5h" do |job_id, at, params|
mails = $inbox.fetch_mails
mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }
params[:every] = if mails.size > 100
"1h" # lots of spam, check every hour
else
"5h" # normal schedule, every 5 hours
end
end
Unschedule example :
schedule.schedule_every "10s" do |job_id, at, params|
#
# polls every 10 seconds until a mail arrives
$mail = $inbox.fetch_last_mail
params[:dont_reschedule] = true if $mail
end
Instance Attribute Summary collapse
-
#precision ⇒ Object
By default, the precision is 0.250, with means the scheduler will check for jobs to execute 4 times per second.
-
#stopped ⇒ Object
As its name implies.
Class Method Summary collapse
-
.is_cron_string(s) ⇒ Object
Returns true if the given string seems to be a cron string.
Instance Method Summary collapse
-
#at_job_count ⇒ Object
Returns the current count of ‘at’ jobs scheduled (not ‘every’).
-
#cron_job_count ⇒ Object
Returns the number of cron jobs currently active in this scheduler.
-
#every_job_count ⇒ Object
Returns the current count of ‘every’ jobs scheduled.
-
#find_jobs(tag) ⇒ Object
Returns an array of jobs that have the given tag.
-
#find_schedulables(tag) ⇒ Object
Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects.
-
#get_job(job_id) ⇒ Object
Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.
-
#get_schedulable(job_id) ⇒ Object
Finds a job (via get_job()) and then returns the wrapped schedulable if any.
-
#initialize(params = {}) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#join ⇒ Object
Joins on the scheduler thread.
-
#join_until_no_more_jobs ⇒ Object
Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining.
-
#pending_job_count ⇒ Object
Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).
-
#schedule(cron_line, params = {}, &block) ⇒ Object
Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see “man 5 crontab” in your command line, or www.google.com/search?q=man%205%20crontab).
-
#schedule_at(at, params = {}, &block) ⇒ Object
Schedules a job by specifying at which time it should trigger.
-
#schedule_every(freq, params = {}, &block) ⇒ Object
Schedules a job in a loop.
-
#schedule_in(duration, params = {}, &block) ⇒ Object
Schedules a job by stating in how much time it should trigger.
-
#sstart ⇒ Object
(also: #start)
Starts this scheduler (or restart it if it was previously stopped).
-
#sstop ⇒ Object
(also: #stop)
The scheduler is stoppable via sstop().
-
#unschedule(job_id) ⇒ Object
Unschedules an ‘at’ or a ‘cron’ job identified by the id it was given at schedule time.
-
#unschedule_cron_job(job_id) ⇒ Object
Unschedules a cron job.
Constructor Details
#initialize(params = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/openwfe/util/scheduler.rb', line 225 def initialize (params={}) super() @pending_jobs = [] @cron_jobs = {} @scheduler_thread = nil @precision = 0.250 # every 250ms, the scheduler wakes up (default value) begin @precision = Float(params[:scheduler_precision]) rescue Exception => e # let precision at its default value end @exit_when_no_more_jobs = false @dont_reschedule_every = false @last_cron_second = -1 @stopped = true end |
Instance Attribute Details
#precision ⇒ Object
By default, the precision is 0.250, with means the scheduler will check for jobs to execute 4 times per second.
217 218 219 |
# File 'lib/openwfe/util/scheduler.rb', line 217 def precision @precision end |
#stopped ⇒ Object
As its name implies.
222 223 224 |
# File 'lib/openwfe/util/scheduler.rb', line 222 def stopped @stopped end |
Class Method Details
.is_cron_string(s) ⇒ Object
Returns true if the given string seems to be a cron string.
616 617 618 |
# File 'lib/openwfe/util/scheduler.rb', line 616 def Scheduler.is_cron_string (s) s.match(".+ .+ .+ .+ .+") end |
Instance Method Details
#at_job_count ⇒ Object
Returns the current count of ‘at’ jobs scheduled (not ‘every’).
609 610 611 |
# File 'lib/openwfe/util/scheduler.rb', line 609 def at_job_count @pending_jobs.select { |j| j.instance_of?(AtJob) }.size end |
#cron_job_count ⇒ Object
Returns the number of cron jobs currently active in this scheduler.
595 596 597 |
# File 'lib/openwfe/util/scheduler.rb', line 595 def cron_job_count @cron_jobs.size end |
#every_job_count ⇒ Object
Returns the current count of ‘every’ jobs scheduled.
602 603 604 |
# File 'lib/openwfe/util/scheduler.rb', line 602 def every_job_count @pending_jobs.select { |j| j.is_a?(EveryJob) }.size end |
#find_jobs(tag) ⇒ Object
Returns an array of jobs that have the given tag.
552 553 554 555 556 557 558 559 560 561 562 563 |
# File 'lib/openwfe/util/scheduler.rb', line 552 def find_jobs (tag) result = @cron_jobs.values.find_all do |job| job.has_tag?(tag) end synchronize do result + @pending_jobs.find_all do |job| job.has_tag?(tag) end end end |
#find_schedulables(tag) ⇒ Object
Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects. Jobs that haven’t a wrapped Schedulable won’t be included in the result.
571 572 573 574 575 576 577 578 579 580 581 582 |
# File 'lib/openwfe/util/scheduler.rb', line 571 def find_schedulables (tag) jobs = find_jobs(tag) result = [] jobs.each do |job| result.push(job.schedulable) if job.respond_to?(:schedulable) end result end |
#get_job(job_id) ⇒ Object
Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.
522 523 524 525 526 527 528 529 530 531 532 |
# File 'lib/openwfe/util/scheduler.rb', line 522 def get_job (job_id) job = @cron_jobs[job_id] return job if job synchronize do @pending_jobs.find do |job| job.job_id == job_id end end end |
#get_schedulable(job_id) ⇒ Object
Finds a job (via get_job()) and then returns the wrapped schedulable if any.
538 539 540 541 542 543 544 545 546 547 |
# File 'lib/openwfe/util/scheduler.rb', line 538 def get_schedulable (job_id) return nil unless job_id j = get_job(job_id) return j.schedulable if j.respond_to?(:schedulable) nil end |
#join ⇒ Object
Joins on the scheduler thread
289 290 291 292 |
# File 'lib/openwfe/util/scheduler.rb', line 289 def join @scheduler_thread.join end |
#join_until_no_more_jobs ⇒ Object
Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining. Thus the scheduler will exit (and the join terminates) as soon as there aren’t no more ‘at’ (or ‘every’) jobs in the scheduler.
Currently used only in unit tests.
302 303 304 305 306 |
# File 'lib/openwfe/util/scheduler.rb', line 302 def join_until_no_more_jobs @exit_when_no_more_jobs = true join end |
#pending_job_count ⇒ Object
Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).
588 589 590 |
# File 'lib/openwfe/util/scheduler.rb', line 588 def pending_job_count @pending_jobs.size end |
#schedule(cron_line, params = {}, &block) ⇒ Object
Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see “man 5 crontab” in your command line, or www.google.com/search?q=man%205%20crontab).
For example :
scheduler.schedule("5 0 * * *", s)
# will trigger the schedulable s every day
# five minutes after midnight
scheduler.schedule("15 14 1 * *", s)
# will trigger s at 14:15 on the first of every month
scheduler.schedule("0 22 * * 1-5") do
puts "it's break time..."
end
# outputs a message every weekday at 10pm
Returns the job id attributed to this ‘cron job’, this id can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 |
# File 'lib/openwfe/util/scheduler.rb', line 494 def schedule (cron_line, params={}, &block) synchronize do params = prepare_params(params) # # is a job with the same id already scheduled ? cron_id = params[:cron_id] cron_id = params[:job_id] unless cron_id unschedule(cron_id) if cron_id # # schedule b = to_block(params, &block) job = CronJob.new(self, cron_id, cron_line, params, &b) @cron_jobs[job.job_id] = job job.job_id end end |
#schedule_at(at, params = {}, &block) ⇒ Object
Schedules a job by specifying at which time it should trigger. Returns the a job_id that can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
If the job is specified in the past, it will be triggered immediately but not scheduled. To avoid the triggering, the parameter :discard_past may be set to true :
jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
puts "you'll never read this message"
end
And ‘jobid’ will hold a nil (not scheduled).
327 328 329 330 331 332 |
# File 'lib/openwfe/util/scheduler.rb', line 327 def schedule_at (at, params={}, &block) params = prepare_params(params) sschedule_at(at, params, &block) end |
#schedule_every(freq, params = {}, &block) ⇒ Object
Schedules a job in a loop. After an execution, it will not execute before the time specified in ‘freq’.
This method returns a job identifier which can be used to unschedule() the job.
In case of exception in the job, it will be rescheduled. If you don’t want the job to be rescheduled, set the parameter :try_again to false.
scheduler.schedule_every "500", :try_again => false do
do_some_prone_to_error_stuff()
# won't get rescheduled in base of exception
end
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/openwfe/util/scheduler.rb', line 365 def schedule_every (freq, params={}, &block) f = duration_to_f freq params = prepare_params params schedulable = params[:schedulable] params[:every] = freq last_at = params[:last_at] next_at = if last_at last_at + f else Time.now.to_f + f end sschedule_at next_at, params do |job_id, at| # # trigger ... hit_exception = false begin if schedulable schedulable.trigger params else block.call job_id, at, params end rescue Exception => e log_exception e hit_exception = true end # cannot use a return here !!! (block) #unless (hit_exception and params[:try_again] == false) # # # # reschedule ... # params[:job_id] = job_id # params[:last_at] = at # # schedule_every(params[:every], params, &block) \ # unless @dont_reschedule_every # # # # yes, this is a kind of recursion #end unless \ @dont_reschedule_every or (params[:dont_reschedule] == true) or (hit_exception and params[:try_again] == false) # # ok, reschedule ... params[:job_id] = job_id params[:last_at] = at schedule_every params[:every], params, &block # # yes, this is a kind of recursion # note that params[:every] might have been changed # by the block/schedulable code end job_id end end |
#schedule_in(duration, params = {}, &block) ⇒ Object
Schedules a job by stating in how much time it should trigger. Returns the a job_id that can be used to unschedule the job.
This method returns a job identifier which can be used to unschedule() the job.
342 343 344 345 346 347 348 |
# File 'lib/openwfe/util/scheduler.rb', line 342 def schedule_in (duration, params={}, &block) duration = duration_to_f(duration) params = prepare_params(params) schedule_at(Time.new.to_f + duration, params, &block) end |
#sstart ⇒ Object Also known as: start
Starts this scheduler (or restart it if it was previously stopped)
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/openwfe/util/scheduler.rb', line 253 def sstart @stopped = false @scheduler_thread = Thread.new do if defined?(JRUBY_VERSION) require 'java' java.lang.Thread.current_thread.name = \ "openwferu scheduler (Ruby Thread)" end loop do break if @stopped step sleep @precision end end end |
#sstop ⇒ Object Also known as: stop
The scheduler is stoppable via sstop()
278 279 280 281 |
# File 'lib/openwfe/util/scheduler.rb', line 278 def sstop @stopped = true end |
#unschedule(job_id) ⇒ Object
Unschedules an ‘at’ or a ‘cron’ job identified by the id it was given at schedule time.
442 443 444 445 446 447 448 449 450 451 452 453 454 |
# File 'lib/openwfe/util/scheduler.rb', line 442 def unschedule (job_id) synchronize do for i in 0...@pending_jobs.length if @pending_jobs[i].job_id == job_id @pending_jobs.delete_at i return true end end unschedule_cron_job job_id end end |
#unschedule_cron_job(job_id) ⇒ Object
Unschedules a cron job
459 460 461 462 463 464 465 466 467 |
# File 'lib/openwfe/util/scheduler.rb', line 459 def unschedule_cron_job (job_id) synchronize do if @cron_jobs.has_key?(job_id) @cron_jobs.delete job_id return true end false end end |