Class: OpenWFE::Scheduler

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/openwfe/util/scheduler.rb

Overview

The Scheduler is used by OpenWFEru for registering ‘at’ and ‘cron’ jobs. ‘at’ jobs to execute once at a given point in time. ‘cron’ jobs execute a specified intervals. The two main methods are thus schedule_at() and schedule().

schedule_at() and schedule() await either a Schedulable instance and params (usually an array or nil), either a block, which is more in the Ruby way.

Examples

scheduler.schedule_in("3d") do
    regenerate_monthly_report()
end
    # 
    # will call the regenerate_monthly_report method 
    # in 3 days from now

 scheduler.schedule "0 22 * * 1-5" do 
     log.info "activating security system..."
     activate_security_system()
 end

 job_id = scheduler.schedule_at "Sun Oct 07 14:24:01 +0900 2009" do
     init_self_destruction_sequence()
 end

an example that uses a Schedulable class :

class Regenerator < Schedulable
    def trigger (frequency)
        self.send(frequency)
    end
    def monthly
        # ...
    end
    def yearly
        # ...
    end
end

regenerator = Regenerator.new

scheduler.schedule_in("4d", regenerator)
    # 
    # will regenerate the report in four days

scheduler.schedule_in(
    "5d", 
    { :schedulable => regenerator, :scope => :month })
        #
        # will regenerate the monthly report in 5 days

There is also schedule_every() :

scheduler.schedule_every("1h20m") do
    regenerate_latest_report()
end

The scheduler has a “exit_when_no_more_jobs” attribute. When set to ‘true’, the scheduler will exit as soon as there are no more jobs to run. Use with care though, if you create a scheduler, set this attribute to true and start the scheduler, the scheduler will immediately exit. This attribute is best used indirectly : the method join_until_no_more_jobs() wraps it.

The :scheduler_precision can be set when instantiating the scheduler.

scheduler = OpenWFE::Scheduler.new(:scheduler_precision => 0.500)
scheduler.start
    #
    # instatiates a scheduler that checks its jobs twice per second
    # (the default is 4 times per second (0.250))

Tags

Since OpenWFEru 0.9.16, tags can be attached to jobs scheduled :

scheduler.schedule_in "2h", :tags => "backup" do
    init_backup_sequence()
end

scheduler.schedule "0 24 * * *", :tags => "new_day" do
    do_this_or_that()
end

jobs = find_jobs 'backup'
jobs.each { |job| job.unschedule }

Multiple tags may be attached to a single job :

scheduler.schedule_in "2h", :tags => [ "backup", "important" ]  do
    init_backup_sequence()
end

The vanilla case for tags assume they are String instances, but nothing prevents you from using anything else. The scheduler has no persistence by itself, so no serialization issue.

Cron up to the second

Since OpenWFEru 0.9.16, a cron schedule can be set at the second level :

scheduler.schedule "7 * * * * *" do
    puts "it's now the seventh second of the minute"
end

The OpenWFEru scheduler recognizes an optional first column for second scheduling. This column can, like for the other columns, specify a value (“7”), a list of values (“7,8,9,27”) or a range (“7-12”).

Exceptions

The OpenWFEru scheduler will output a stacktrace to the STDOUT in case of exception. There are two ways to change that behaviour.

# 1 - providing a lwarn method to the scheduler instance :

class << scheduler
    def lwarn (&block)
        puts "oops, something wrong happened : "
        puts block.call
    end
end

# 2 - overriding the [protected] method log_exception(e) :

class << scheduler
    def log_exception (e)
        puts "something wrong happened : "+e.to_s
    end
end

‘Every jobs’ and rescheduling

Every jobs can reschedule/unschedule themselves. A reschedule example :

schedule.schedule_every "5h" do |job_id, at, params|

    mails = $inbox.fetch_mails
    mails.each { |m| $inbox.mark_as_spam(m) if is_spam(m) }

    params[:every] = if mails.size > 100
        "1h" # lots of spam, check every hour
    else
        "5h" # normal schedule, every 5 hours
    end
end

Unschedule example :

schedule.schedule_every "10s" do |job_id, at, params|
    #
    # polls every 10 seconds until a mail arrives

    $mail = $inbox.fetch_last_mail

    params[:dont_reschedule] = true if $mail
end

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Scheduler

Returns a new instance of Scheduler.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/openwfe/util/scheduler.rb', line 225

def initialize (params={})

    super()

    @pending_jobs = []
    @cron_jobs = {}

    @scheduler_thread = nil

    @precision = 0.250
        # every 250ms, the scheduler wakes up (default value)
    begin
        @precision = Float(params[:scheduler_precision])
    rescue Exception => e
        # let precision at its default value
    end

    @exit_when_no_more_jobs = false
    @dont_reschedule_every = false

    @last_cron_second = -1

    @stopped = true
end

Instance Attribute Details

#precisionObject

By default, the precision is 0.250, with means the scheduler will check for jobs to execute 4 times per second.



217
218
219
# File 'lib/openwfe/util/scheduler.rb', line 217

def precision
  @precision
end

#stoppedObject

As its name implies.



222
223
224
# File 'lib/openwfe/util/scheduler.rb', line 222

def stopped
  @stopped
end

Class Method Details

.is_cron_string(s) ⇒ Object

Returns true if the given string seems to be a cron string.



616
617
618
# File 'lib/openwfe/util/scheduler.rb', line 616

def Scheduler.is_cron_string (s)
    s.match(".+ .+ .+ .+ .+")
end

Instance Method Details

#at_job_countObject

Returns the current count of ‘at’ jobs scheduled (not ‘every’).



609
610
611
# File 'lib/openwfe/util/scheduler.rb', line 609

def at_job_count
    @pending_jobs.select { |j| j.instance_of?(AtJob) }.size
end

#cron_job_countObject

Returns the number of cron jobs currently active in this scheduler.



595
596
597
# File 'lib/openwfe/util/scheduler.rb', line 595

def cron_job_count
    @cron_jobs.size
end

#every_job_countObject

Returns the current count of ‘every’ jobs scheduled.



602
603
604
# File 'lib/openwfe/util/scheduler.rb', line 602

def every_job_count
    @pending_jobs.select { |j| j.is_a?(EveryJob) }.size
end

#find_jobs(tag) ⇒ Object

Returns an array of jobs that have the given tag.



552
553
554
555
556
557
558
559
560
561
562
563
# File 'lib/openwfe/util/scheduler.rb', line 552

def find_jobs (tag)

    result = @cron_jobs.values.find_all do |job|
        job.has_tag?(tag)
    end

    synchronize do
        result + @pending_jobs.find_all do |job|
            job.has_tag?(tag)
        end
    end
end

#find_schedulables(tag) ⇒ Object

Finds the jobs with the given tag and then returns an array of the wrapped Schedulable objects. Jobs that haven’t a wrapped Schedulable won’t be included in the result.



571
572
573
574
575
576
577
578
579
580
581
582
# File 'lib/openwfe/util/scheduler.rb', line 571

def find_schedulables (tag)

    jobs = find_jobs(tag)

    result = []

    jobs.each do |job|
        result.push(job.schedulable) if job.respond_to?(:schedulable)
    end

    result
end

#get_job(job_id) ⇒ Object

Returns the job corresponding to job_id, an instance of AtJob or CronJob will be returned.



522
523
524
525
526
527
528
529
530
531
532
# File 'lib/openwfe/util/scheduler.rb', line 522

def get_job (job_id)

    job = @cron_jobs[job_id]
    return job if job

    synchronize do
        @pending_jobs.find do |job|
            job.job_id == job_id
        end
    end
end

#get_schedulable(job_id) ⇒ Object

Finds a job (via get_job()) and then returns the wrapped schedulable if any.



538
539
540
541
542
543
544
545
546
547
# File 'lib/openwfe/util/scheduler.rb', line 538

def get_schedulable (job_id)

    return nil unless job_id

    j = get_job(job_id)

    return j.schedulable if j.respond_to?(:schedulable)

    nil
end

#joinObject

Joins on the scheduler thread



289
290
291
292
# File 'lib/openwfe/util/scheduler.rb', line 289

def join

    @scheduler_thread.join
end

#join_until_no_more_jobsObject

Like join() but takes care of setting the ‘exit_when_no_more_jobs’ attribute of this scheduler to true before joining. Thus the scheduler will exit (and the join terminates) as soon as there aren’t no more ‘at’ (or ‘every’) jobs in the scheduler.

Currently used only in unit tests.



302
303
304
305
306
# File 'lib/openwfe/util/scheduler.rb', line 302

def join_until_no_more_jobs

    @exit_when_no_more_jobs = true
    join
end

#pending_job_countObject

Returns the number of currently pending jobs in this scheduler (‘at’ jobs and ‘every’ jobs).



588
589
590
# File 'lib/openwfe/util/scheduler.rb', line 588

def pending_job_count
    @pending_jobs.size
end

#schedule(cron_line, params = {}, &block) ⇒ Object

Schedules a cron job, the ‘cron_line’ is a string following the Unix cron standard (see “man 5 crontab” in your command line, or www.google.com/search?q=man%205%20crontab).

For example :

scheduler.schedule("5 0 * * *", s)
    # will trigger the schedulable s every day
    # five minutes after midnight

scheduler.schedule("15 14 1 * *", s)
    # will trigger s at 14:15 on the first of every month

scheduler.schedule("0 22 * * 1-5") do
    puts "it's break time..."
end
    # outputs a message every weekday at 10pm

Returns the job id attributed to this ‘cron job’, this id can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.



494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
# File 'lib/openwfe/util/scheduler.rb', line 494

def schedule (cron_line, params={}, &block)
    synchronize do

        params = prepare_params(params)
    
        #
        # is a job with the same id already scheduled ?

        cron_id = params[:cron_id]
        cron_id = params[:job_id] unless cron_id

        unschedule(cron_id) if cron_id

        #
        # schedule

        b = to_block(params, &block)
        job = CronJob.new(self, cron_id, cron_line, params, &b)
        @cron_jobs[job.job_id] = job

        job.job_id
    end
end

#schedule_at(at, params = {}, &block) ⇒ Object

Schedules a job by specifying at which time it should trigger. Returns the a job_id that can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.

If the job is specified in the past, it will be triggered immediately but not scheduled. To avoid the triggering, the parameter :discard_past may be set to true :

jobid = scheduler.schedule_at(yesterday, :discard_past => true) do
    puts "you'll never read this message"
end

And ‘jobid’ will hold a nil (not scheduled).



327
328
329
330
331
332
# File 'lib/openwfe/util/scheduler.rb', line 327

def schedule_at (at, params={}, &block)

    params = prepare_params(params)

    sschedule_at(at, params, &block)
end

#schedule_every(freq, params = {}, &block) ⇒ Object

Schedules a job in a loop. After an execution, it will not execute before the time specified in ‘freq’.

This method returns a job identifier which can be used to unschedule() the job.

In case of exception in the job, it will be rescheduled. If you don’t want the job to be rescheduled, set the parameter :try_again to false.

scheduler.schedule_every "500", :try_again => false do
    do_some_prone_to_error_stuff()
        # won't get rescheduled in base of exception
end


365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/openwfe/util/scheduler.rb', line 365

def schedule_every (freq, params={}, &block)

    f = duration_to_f freq

    params = prepare_params params
    schedulable = params[:schedulable]
    params[:every] = freq

    last_at = params[:last_at]
    next_at = if last_at
        last_at + f
    else
        Time.now.to_f + f
    end

    sschedule_at next_at, params do |job_id, at|

        #
        # trigger ...

        hit_exception = false

        begin

            if schedulable
                schedulable.trigger params
            else
                block.call job_id, at, params
            end

        rescue Exception => e

            log_exception e

            hit_exception = true
        end

        # cannot use a return here !!! (block)

        #unless (hit_exception and params[:try_again] == false)
        #    #
        #    # reschedule ...
        #    params[:job_id] = job_id
        #    params[:last_at] = at
        #    
        #    schedule_every(params[:every], params, &block) \
        #        unless @dont_reschedule_every
        #            #
        #            # yes, this is a kind of recursion
        #end
        unless \
            @dont_reschedule_every or
            (params[:dont_reschedule] == true) or
            (hit_exception and params[:try_again] == false)

            #
            # ok, reschedule ...

            params[:job_id] = job_id
            params[:last_at] = at
            
            schedule_every params[:every], params, &block
                #
                # yes, this is a kind of recursion

                # note that params[:every] might have been changed
                # by the block/schedulable code
        end

        job_id
    end
end

#schedule_in(duration, params = {}, &block) ⇒ Object

Schedules a job by stating in how much time it should trigger. Returns the a job_id that can be used to unschedule the job.

This method returns a job identifier which can be used to unschedule() the job.



342
343
344
345
346
347
348
# File 'lib/openwfe/util/scheduler.rb', line 342

def schedule_in (duration, params={}, &block)

    duration = duration_to_f(duration)
    params = prepare_params(params)

    schedule_at(Time.new.to_f + duration, params, &block)
end

#sstartObject Also known as: start

Starts this scheduler (or restart it if it was previously stopped)



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/openwfe/util/scheduler.rb', line 253

def sstart

    @stopped = false

    @scheduler_thread = Thread.new do

        if defined?(JRUBY_VERSION)

            require 'java'

            java.lang.Thread.current_thread.name = \
                "openwferu scheduler (Ruby Thread)"
        end

        loop do
            break if @stopped
            step
            sleep @precision
        end
    end
end

#sstopObject Also known as: stop

The scheduler is stoppable via sstop()



278
279
280
281
# File 'lib/openwfe/util/scheduler.rb', line 278

def sstop

    @stopped = true
end

#unschedule(job_id) ⇒ Object

Unschedules an ‘at’ or a ‘cron’ job identified by the id it was given at schedule time.



442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/openwfe/util/scheduler.rb', line 442

def unschedule (job_id)
    synchronize do

        for i in 0...@pending_jobs.length
            if @pending_jobs[i].job_id == job_id
                @pending_jobs.delete_at i
                return true
            end
        end

        unschedule_cron_job job_id
    end
end

#unschedule_cron_job(job_id) ⇒ Object

Unschedules a cron job



459
460
461
462
463
464
465
466
467
# File 'lib/openwfe/util/scheduler.rb', line 459

def unschedule_cron_job (job_id)
    synchronize do
        if @cron_jobs.has_key?(job_id)
            @cron_jobs.delete job_id
            return true
        end
        false
    end
end