Class: Rufus::Scheduler
- Inherits:
-
Object
- Object
- Rufus::Scheduler
- Defined in:
- lib/rufus/scheduler.rb,
lib/rufus/scheduler/util.rb
Defined Under Namespace
Classes: AtJob, CronJob, D, Error, EvInJob, EveryJob, FileLock, InJob, IntervalJob, Job, JobArray, NotRunningError, NullLock, OneTimeJob, RepeatJob, TimeoutError
Constant Summary collapse
- VERSION =
'3.9.2'
- EoTime =
::EtOrbi::EoTime
- MAX_WORK_THREADS =
MIN_WORK_THREADS = 3
28
Instance Attribute Summary collapse
-
#discard_past ⇒ Object
Returns the value of attribute discard_past.
-
#frequency ⇒ Object
Returns the value of attribute frequency.
-
#max_work_threads ⇒ Object
attr_accessor :min_work_threads.
-
#mutexes ⇒ Object
readonly
Returns the value of attribute mutexes.
-
#paused_at ⇒ Object
readonly
Returns the value of attribute paused_at.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
-
#stderr ⇒ Object
Returns the value of attribute stderr.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
-
#thread_key ⇒ Object
readonly
Returns the value of attribute thread_key.
-
#work_queue ⇒ Object
readonly
Returns the value of attribute work_queue.
Class Method Summary collapse
- .ltstamp ⇒ Object
-
.parse(o, opts = {}) ⇒ Object
– time and string methods ++.
- .parse_at(o, opts = {}) ⇒ Object
- .parse_cron(o, opts = {}) ⇒ Object
-
.parse_duration(str, opts = {}) ⇒ Object
Turns a string like ‘1m10s’ into a float like ‘70.0’, more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
- .parse_in(o, opts = {}) ⇒ Object
-
.s(opts = {}) ⇒ Object
Alias for Rufus::Scheduler.singleton.
-
.singleton(opts = {}) ⇒ Object
Returns a singleton Rufus::Scheduler instance.
-
.start_new ⇒ Object
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
-
.to_duration(seconds, options = {}) ⇒ Object
Turns a number of seconds into a a time string.
-
.to_duration_hash(seconds, options = {}) ⇒ Object
Turns a number of seconds (integer or Float) into a hash like in :.
-
.to_fugit_duration(seconds, options = {}) ⇒ Object
Used by both .to_duration and .to_duration_hash.
Instance Method Summary collapse
- #around_trigger(job) ⇒ Object
-
#at(time, callable = nil, opts = {}, &block) ⇒ Object
– scheduling methods ++.
- #at_jobs(opts = {}) ⇒ Object
-
#confirm_lock ⇒ Object
Callback called when a job is triggered.
- #cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
- #cron_jobs(opts = {}) ⇒ Object
- #down? ⇒ Boolean
- #every(duration, callable = nil, opts = {}, &block) ⇒ Object
- #every_jobs(opts = {}) ⇒ Object
-
#h_to_s(t = Time.now) ⇒ Object
Produces a hour/min/sec/milli string representation of Time instance.
- #in(duration, callable = nil, opts = {}, &block) ⇒ Object
- #in_jobs(opts = {}) ⇒ Object
-
#initialize(opts = {}) ⇒ Scheduler
constructor
A new instance of Scheduler.
- #interval(duration, callable = nil, opts = {}, &block) ⇒ Object
- #interval_jobs(opts = {}) ⇒ Object
- #job(job_id) ⇒ Object
-
#jobs(opts = {}) ⇒ Object
Returns all the scheduled jobs (even those right before re-schedule).
- #join(time_limit = nil) ⇒ Object
-
#lock ⇒ Object
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
- #monow ⇒ Object
- #occurrences(time0, time1, format = :per_job) ⇒ Object
- #on_error(job, err) ⇒ Object
- #pause ⇒ Object
- #paused? ⇒ Boolean
- #repeat(arg, callable = nil, opts = {}, &block) ⇒ Object
- #resume(opts = {}) ⇒ Object
- #running_jobs(opts = {}) ⇒ Object
- #schedule(arg, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_at(time, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_every(duration, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_in(duration, callable = nil, opts = {}, &block) ⇒ Object
- #schedule_interval(duration, callable = nil, opts = {}, &block) ⇒ Object
-
#scheduled?(job_or_job_id) ⇒ Boolean
Returns true if this job is currently scheduled.
- #shutdown(opt = nil) ⇒ Object (also: #stop)
-
#threads ⇒ Object
Lists all the threads associated with this scheduler.
- #timeline(time0, time1) ⇒ Object
-
#unlock ⇒ Object
Sister method to #lock, is called when the scheduler shuts down.
- #unschedule(job_or_job_id) ⇒ Object
- #up? ⇒ Boolean
- #uptime ⇒ Object
- #uptime_s ⇒ Object
-
#utc_to_s(t = Time.now) ⇒ Object
Produces the UTC string representation of a Time instance.
-
#work_threads(query = :all) ⇒ Object
Lists all the work threads (the ones actually running the scheduled block code).
Constructor Details
#initialize(opts = {}) ⇒ Scheduler
Returns a new instance of Scheduler.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/rufus/scheduler.rb', line 58 def initialize(opts={}) @opts = opts @started_at = nil @paused_at = nil @jobs = JobArray.new @frequency = Rufus::Scheduler.parse(opts[:frequency] || 0.300) @discard_past = opts.has_key?(:discard_past) ? opts[:discard_past] : true @mutexes = {} @work_queue = Queue.new @join_queue = Queue.new #@min_work_threads = # opts[:min_work_threads] || opts[:min_worker_threads] || # MIN_WORK_THREADS @max_work_threads = opts[:max_work_threads] || opts[:max_worker_threads] || MAX_WORK_THREADS @stderr = $stderr @thread_key = "rufus_scheduler_#{self.object_id}" @scheduler_lock = if lockfile = opts[:lockfile] Rufus::Scheduler::FileLock.new(lockfile) else opts[:scheduler_lock] || Rufus::Scheduler::NullLock.new end @trigger_lock = opts[:trigger_lock] || Rufus::Scheduler::NullLock.new # If we can't grab the @scheduler_lock, don't run. lock || return start end |
Instance Attribute Details
#discard_past ⇒ Object
Returns the value of attribute discard_past.
43 44 45 |
# File 'lib/rufus/scheduler.rb', line 43 def discard_past @discard_past end |
#frequency ⇒ Object
Returns the value of attribute frequency.
42 43 44 |
# File 'lib/rufus/scheduler.rb', line 42 def frequency @frequency end |
#max_work_threads ⇒ Object
attr_accessor :min_work_threads
52 53 54 |
# File 'lib/rufus/scheduler.rb', line 52 def max_work_threads @max_work_threads end |
#mutexes ⇒ Object (readonly)
Returns the value of attribute mutexes.
49 50 51 |
# File 'lib/rufus/scheduler.rb', line 49 def mutexes @mutexes end |
#paused_at ⇒ Object (readonly)
Returns the value of attribute paused_at.
46 47 48 |
# File 'lib/rufus/scheduler.rb', line 46 def paused_at @paused_at end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
45 46 47 |
# File 'lib/rufus/scheduler.rb', line 45 def started_at @started_at end |
#stderr ⇒ Object
Returns the value of attribute stderr.
54 55 56 |
# File 'lib/rufus/scheduler.rb', line 54 def stderr @stderr end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
47 48 49 |
# File 'lib/rufus/scheduler.rb', line 47 def thread @thread end |
#thread_key ⇒ Object (readonly)
Returns the value of attribute thread_key.
48 49 50 |
# File 'lib/rufus/scheduler.rb', line 48 def thread_key @thread_key end |
#work_queue ⇒ Object (readonly)
Returns the value of attribute work_queue.
56 57 58 |
# File 'lib/rufus/scheduler.rb', line 56 def work_queue @work_queue end |
Class Method Details
.ltstamp ⇒ Object
206 |
# File 'lib/rufus/scheduler/util.rb', line 206 def ltstamp; Time.now.strftime('%FT%T.%3N'); end |
.parse(o, opts = {}) ⇒ Object
– time and string methods ++
10 11 12 13 14 15 16 17 18 |
# File 'lib/rufus/scheduler/util.rb', line 10 def parse(o, opts={}) opts[:no_error] = true parse_cron(o, opts) || parse_in(o, opts) || # covers 'every' schedule strings parse_at(o, opts) || fail(ArgumentError.new("couldn't parse #{o.inspect} (#{o.class})")) end |
.parse_at(o, opts = {}) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rufus/scheduler/util.rb', line 42 def parse_at(o, opts={}) return o if o.is_a?(EoTime) return EoTime.make(o) if o.is_a?(Time) EoTime.parse(o, opts) rescue StandardError => se return nil if opts[:no_error] fail se end |
.parse_cron(o, opts = {}) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/rufus/scheduler/util.rb', line 20 def parse_cron(o, opts={}) opts[:no_error] ? Fugit.parse_cron(o) : Fugit.do_parse_cron(o) end |
.parse_duration(str, opts = {}) ⇒ Object
Turns a string like ‘1m10s’ into a float like ‘70.0’, more formally, turns a time duration expressed as a string into a Float instance (millisecond count).
w -> week d -> day h -> hour m -> minute s -> second M -> month y -> year ‘nada’ -> millisecond
Some examples:
Rufus::Scheduler.parse_duration "0.5" # => 0.5
Rufus::Scheduler.parse_duration "500" # => 0.5
Rufus::Scheduler.parse_duration "1000" # => 1.0
Rufus::Scheduler.parse_duration "1h" # => 3600.0
Rufus::Scheduler.parse_duration "1h10s" # => 3610.0
Rufus::Scheduler.parse_duration "1w2d" # => 777600.0
Negative time strings are OK (Thanks Danny Fullerton):
Rufus::Scheduler.parse_duration "-0.5" # => -0.5
Rufus::Scheduler.parse_duration "-1h" # => -3600.0
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rufus/scheduler/util.rb', line 81 def parse_duration(str, opts={}) d = opts[:no_error] ? Fugit::Duration.parse(str, opts) : Fugit::Duration.do_parse(str, opts) d ? d.to_sec : nil end |
.parse_in(o, opts = {}) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rufus/scheduler/util.rb', line 27 def parse_in(o, opts={}) #o.is_a?(String) ? parse_duration(o, opts) : o return parse_duration(o, opts) if o.is_a?(String) return o if o.is_a?(Numeric) fail ArgumentError.new("couldn't parse time point in #{o.inspect}") rescue ArgumentError => ae return nil if opts[:no_error] fail ae end |
.s(opts = {}) ⇒ Object
Alias for Rufus::Scheduler.singleton
110 |
# File 'lib/rufus/scheduler.rb', line 110 def self.s(opts={}); singleton(opts); end |
.singleton(opts = {}) ⇒ Object
Returns a singleton Rufus::Scheduler instance
103 104 105 106 |
# File 'lib/rufus/scheduler.rb', line 103 def self.singleton(opts={}) @singleton ||= Rufus::Scheduler.new(opts) end |
.start_new ⇒ Object
Releasing the gem would probably require redirecting .start_new to .new and emit a simple deprecation message.
For now, let’s assume the people pointing at rufus-scheduler/master on GitHub know what they do…
118 119 120 121 |
# File 'lib/rufus/scheduler.rb', line 118 def self.start_new fail 'this is rufus-scheduler 3.x, use .new instead of .start_new' end |
.to_duration(seconds, options = {}) ⇒ Object
Turns a number of seconds into a a time string
Rufus.to_duration 0 # => '0s'
Rufus.to_duration 60 # => '1m'
Rufus.to_duration 3661 # => '1h1m1s'
Rufus.to_duration 7 * 24 * 3600 # => '1w'
Rufus.to_duration 30 * 24 * 3600 + 1 # => "4w2d1s"
It goes from seconds to the year. Months are not counted (as they are of variable length). Weeks are counted.
For 30 days months to be counted, the second parameter of this method can be set to true.
Rufus.to_duration 30 * 24 * 3600 + 1, true # => "1M1s"
If a Float value is passed, milliseconds will be displayed without ‘marker’
Rufus.to_duration 0.051 # => "51"
Rufus.to_duration 7.051 # => "7s51"
Rufus.to_duration 0.120 + 30 * 24 * 3600 + 1 # => "4w2d1s120"
(this behaviour mirrors the one found for parse_time_string()).
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
124 125 126 127 128 129 130 131 132 |
# File 'lib/rufus/scheduler/util.rb', line 124 def to_duration(seconds, ={}) #d = Fugit::Duration.parse(seconds, options).deflate #d = d.drop_seconds if options[:drop_seconds] #d = d.deflate(:month => options[:months]) if options[:months] #d.to_rufus_s to_fugit_duration(seconds, ).to_rufus_s end |
.to_duration_hash(seconds, options = {}) ⇒ Object
Turns a number of seconds (integer or Float) into a hash like in :
Rufus.to_duration_hash 0.051
# => { :s => 0.051 }
Rufus.to_duration_hash 7.051
# => { :s => 7.051 }
Rufus.to_duration_hash 0.120 + 30 * 24 * 3600 + 1
# => { :w => 4, :d => 2, :s => 1.120 }
This method is used by to_duration behind the scenes.
Options are :
-
:months, if set to true, months (M) of 30 days will be taken into account when building up the result
-
:drop_seconds, if set to true, seconds and milliseconds will be trimmed from the result
152 153 154 155 |
# File 'lib/rufus/scheduler/util.rb', line 152 def to_duration_hash(seconds, ={}) to_fugit_duration(seconds, ).to_rufus_h end |
.to_fugit_duration(seconds, options = {}) ⇒ Object
Used by both .to_duration and .to_duration_hash
159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rufus/scheduler/util.rb', line 159 def to_fugit_duration(seconds, ={}) d = Fugit::Duration .parse(seconds, ) .deflate d = d.drop_seconds if [:drop_seconds] d = d.deflate(:month => [:months]) if [:months] d end |
Instance Method Details
#around_trigger(job) ⇒ Object
128 129 130 131 |
# File 'lib/rufus/scheduler.rb', line 128 def around_trigger(job) yield end |
#at(time, callable = nil, opts = {}, &block) ⇒ Object
– scheduling methods ++
184 185 186 187 |
# File 'lib/rufus/scheduler.rb', line 184 def at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, opts[:job], block) end |
#at_jobs(opts = {}) ⇒ Object
295 296 297 298 |
# File 'lib/rufus/scheduler.rb', line 295 def at_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::AtJob) } end |
#confirm_lock ⇒ Object
Callback called when a job is triggered. If the lock cannot be acquired, the job won’t run (though it’ll still be scheduled to run again if necessary).
363 364 365 366 |
# File 'lib/rufus/scheduler.rb', line 363 def confirm_lock @trigger_lock.lock end |
#cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
224 225 226 227 |
# File 'lib/rufus/scheduler.rb', line 224 def cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, opts[:job], block) end |
#cron_jobs(opts = {}) ⇒ Object
315 316 317 318 |
# File 'lib/rufus/scheduler.rb', line 315 def cron_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::CronJob) } end |
#down? ⇒ Boolean
152 153 154 155 |
# File 'lib/rufus/scheduler.rb', line 152 def down? ! @started_at end |
#every(duration, callable = nil, opts = {}, &block) ⇒ Object
204 205 206 207 |
# File 'lib/rufus/scheduler.rb', line 204 def every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, opts[:job], block) end |
#every_jobs(opts = {}) ⇒ Object
305 306 307 308 |
# File 'lib/rufus/scheduler.rb', line 305 def every_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::EveryJob) } end |
#h_to_s(t = Time.now) ⇒ Object
Produces a hour/min/sec/milli string representation of Time instance
187 188 189 |
# File 'lib/rufus/scheduler/util.rb', line 187 def h_to_s(t=Time.now) t.strftime('%T.%6N') end |
#in(duration, callable = nil, opts = {}, &block) ⇒ Object
194 195 196 197 |
# File 'lib/rufus/scheduler.rb', line 194 def in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, opts[:job], block) end |
#in_jobs(opts = {}) ⇒ Object
300 301 302 303 |
# File 'lib/rufus/scheduler.rb', line 300 def in_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::InJob) } end |
#interval(duration, callable = nil, opts = {}, &block) ⇒ Object
214 215 216 217 |
# File 'lib/rufus/scheduler.rb', line 214 def interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, opts[:job], block) end |
#interval_jobs(opts = {}) ⇒ Object
310 311 312 313 |
# File 'lib/rufus/scheduler.rb', line 310 def interval_jobs(opts={}) jobs(opts).select { |j| j.is_a?(Rufus::Scheduler::IntervalJob) } end |
#job(job_id) ⇒ Object
320 321 322 323 |
# File 'lib/rufus/scheduler.rb', line 320 def job(job_id) @jobs[job_id] end |
#jobs(opts = {}) ⇒ Object
Returns all the scheduled jobs (even those right before re-schedule).
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/rufus/scheduler.rb', line 277 def jobs(opts={}) opts = { opts => true } if opts.is_a?(Symbol) jobs = @jobs.to_a if opts[:running] jobs = jobs.select { |j| j.running? } elsif ! opts[:all] jobs = jobs.reject { |j| j.next_time.nil? || j.unscheduled_at } end = Array(opts[:tag] || opts[:tags]).collect(&:to_s) jobs = jobs.reject { |j| .find { |t| ! j..include?(t) } } jobs end |
#join(time_limit = nil) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/rufus/scheduler.rb', line 138 def join(time_limit=nil) fail NotRunningError.new('cannot join scheduler that is not running') \ unless @thread fail ThreadError.new('scheduler thread cannot join itself') \ if @thread == Thread.current if time_limit time_limit_join(time_limit) else no_time_limit_join end end |
#lock ⇒ Object
Returns true if the scheduler has acquired the [exclusive] lock and thus may run.
Most of the time, a scheduler is run alone and this method should return true. It is useful in cases where among a group of applications only one of them should run the scheduler. For schedulers that should not run, the method should return false.
Out of the box, rufus-scheduler proposes the :lockfile => ‘path/to/lock/file’ scheduler start option. It makes it easy for schedulers on the same machine to determine which should run (the first to write the lockfile and lock it). It uses “man 2 flock” so it probably won’t work reliably on distributed file systems.
If one needs to use a special/different locking mechanism, the scheduler accepts :scheduler_lock => lock_object. lock_object only needs to respond to #lock and #unlock, and both of these methods should be idempotent.
Look at rufus/scheduler/locks.rb for an example.
346 347 348 349 |
# File 'lib/rufus/scheduler.rb', line 346 def lock @scheduler_lock.lock end |
#monow ⇒ Object
740 |
# File 'lib/rufus/scheduler.rb', line 740 def monow; self.class.monow; end |
#occurrences(time0, time1, format = :per_job) ⇒ Object
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 |
# File 'lib/rufus/scheduler.rb', line 416 def occurrences(time0, time1, format=:per_job) h = {} jobs.each do |j| os = j.occurrences(time0, time1) h[j] = os if os.any? end if format == :timeline a = [] h.each { |j, ts| ts.each { |t| a << [ t, j ] } } a.sort_by { |(t, _)| t } else h end end |
#on_error(job, err) ⇒ Object
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 |
# File 'lib/rufus/scheduler.rb', line 439 def on_error(job, err) pre = err.object_id.to_s ms = {}; mutexes.each { |k, v| ms[k] = v.locked? } stderr.puts("{ #{pre} rufus-scheduler intercepted an error:") if job stderr.puts(" #{pre} job:") stderr.puts(" #{pre} #{job.class} #{job.original.inspect} #{job.opts.inspect}") stderr.puts(" #{pre} #{job.source_location.inspect}") # TODO: eventually use a Job#detail or something like that else stderr.puts(" #{pre} job: (error did not occur in a job)") end stderr.puts(" #{pre} error:") stderr.puts(" #{pre} #{err.object_id}") stderr.puts(" #{pre} #{err.class}") stderr.puts(" #{pre} #{err}") err.backtrace.each do |l| stderr.puts(" #{pre} #{l}") end stderr.puts(" #{pre} tz:") stderr.puts(" #{pre} ENV['TZ']: #{ENV['TZ']}") stderr.puts(" #{pre} Time.now: #{Time.now}") stderr.puts(" #{pre} local_tzone: #{EoTime.local_tzone.inspect}") stderr.puts(" #{pre} et-orbi:") stderr.puts(" #{pre} #{EoTime.platform_info}") stderr.puts(" #{pre} scheduler:") stderr.puts(" #{pre} object_id: #{object_id}") stderr.puts(" #{pre} opts:") stderr.puts(" #{pre} #{@opts.inspect}") stderr.puts(" #{pre} frequency: #{self.frequency}") stderr.puts(" #{pre} scheduler_lock: #{@scheduler_lock.inspect}") stderr.puts(" #{pre} trigger_lock: #{@trigger_lock.inspect}") stderr.puts(" #{pre} uptime: #{uptime} (#{uptime_s})") stderr.puts(" #{pre} down?: #{down?}") stderr.puts(" #{pre} frequency: #{frequency.inspect}") stderr.puts(" #{pre} discard_past: #{discard_past.inspect}") stderr.puts(" #{pre} started_at: #{started_at.inspect}") stderr.puts(" #{pre} paused_at: #{paused_at.inspect}") stderr.puts(" #{pre} threads: #{self.threads.size}") stderr.puts(" #{pre} thread: #{self.thread}") stderr.puts(" #{pre} thread_key: #{self.thread_key}") stderr.puts(" #{pre} work_threads: #{work_threads.size}") stderr.puts(" #{pre} active: #{work_threads(:active).size}") stderr.puts(" #{pre} vacant: #{work_threads(:vacant).size}") stderr.puts(" #{pre} max_work_threads: #{max_work_threads}") stderr.puts(" #{pre} mutexes: #{ms.inspect}") stderr.puts(" #{pre} jobs: #{jobs.size}") stderr.puts(" #{pre} at_jobs: #{at_jobs.size}") stderr.puts(" #{pre} in_jobs: #{in_jobs.size}") stderr.puts(" #{pre} every_jobs: #{every_jobs.size}") stderr.puts(" #{pre} interval_jobs: #{interval_jobs.size}") stderr.puts(" #{pre} cron_jobs: #{cron_jobs.size}") stderr.puts(" #{pre} running_jobs: #{running_jobs.size}") stderr.puts(" #{pre} work_queue:") stderr.puts(" #{pre} size: #{@work_queue.size}") stderr.puts(" #{pre} num_waiting: #{@work_queue.num_waiting}") stderr.puts(" #{pre} join_queue:") stderr.puts(" #{pre} size: #{@join_queue.size}") stderr.puts(" #{pre} num_waiting: #{@join_queue.num_waiting}") stderr.puts("} #{pre} .") rescue => e stderr.puts("failure in #on_error itself:") stderr.puts(e.inspect) stderr.puts(e.backtrace) ensure stderr.flush end |
#pause ⇒ Object
167 168 169 170 |
# File 'lib/rufus/scheduler.rb', line 167 def pause @paused_at = EoTime.now end |
#paused? ⇒ Boolean
162 163 164 165 |
# File 'lib/rufus/scheduler.rb', line 162 def paused? !! @paused_at end |
#repeat(arg, callable = nil, opts = {}, &block) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/rufus/scheduler.rb', line 248 def repeat(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Rufus::Scheduler.parse(arg, opts) case opts[:_t] when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) else schedule_every(arg, callable, opts, &block) end end |
#resume(opts = {}) ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/rufus/scheduler.rb', line 172 def resume(opts={}) dp = opts[:discard_past] jobs.each { |job| job.resume_discard_past = dp } @paused_at = nil end |
#running_jobs(opts = {}) ⇒ Object
411 412 413 414 |
# File 'lib/rufus/scheduler.rb', line 411 def running_jobs(opts={}) jobs(opts.merge(:running => true)) end |
#schedule(arg, callable = nil, opts = {}, &block) ⇒ Object
234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/rufus/scheduler.rb', line 234 def schedule(arg, callable=nil, opts={}, &block) callable, opts = nil, callable if callable.is_a?(Hash) opts = opts.dup opts[:_t] = Rufus::Scheduler.parse(arg, opts) case opts[:_t] when ::Fugit::Cron then schedule_cron(arg, callable, opts, &block) when ::EtOrbi::EoTime, Time then schedule_at(arg, callable, opts, &block) else schedule_in(arg, callable, opts, &block) end end |
#schedule_at(time, callable = nil, opts = {}, &block) ⇒ Object
189 190 191 192 |
# File 'lib/rufus/scheduler.rb', line 189 def schedule_at(time, callable=nil, opts={}, &block) do_schedule(:once, time, callable, opts, true, block) end |
#schedule_cron(cronline, callable = nil, opts = {}, &block) ⇒ Object
229 230 231 232 |
# File 'lib/rufus/scheduler.rb', line 229 def schedule_cron(cronline, callable=nil, opts={}, &block) do_schedule(:cron, cronline, callable, opts, true, block) end |
#schedule_every(duration, callable = nil, opts = {}, &block) ⇒ Object
209 210 211 212 |
# File 'lib/rufus/scheduler.rb', line 209 def schedule_every(duration, callable=nil, opts={}, &block) do_schedule(:every, duration, callable, opts, true, block) end |
#schedule_in(duration, callable = nil, opts = {}, &block) ⇒ Object
199 200 201 202 |
# File 'lib/rufus/scheduler.rb', line 199 def schedule_in(duration, callable=nil, opts={}, &block) do_schedule(:once, duration, callable, opts, true, block) end |
#schedule_interval(duration, callable = nil, opts = {}, &block) ⇒ Object
219 220 221 222 |
# File 'lib/rufus/scheduler.rb', line 219 def schedule_interval(duration, callable=nil, opts={}, &block) do_schedule(:interval, duration, callable, opts, true, block) end |
#scheduled?(job_or_job_id) ⇒ Boolean
Returns true if this job is currently scheduled.
Takes extra care to answer true if the job is a repeat job currently firing.
373 374 375 376 377 378 |
# File 'lib/rufus/scheduler.rb', line 373 def scheduled?(job_or_job_id) job, _ = fetch(job_or_job_id) !! (job && job.unscheduled_at.nil? && job.next_time != nil) end |
#shutdown(opt = nil) ⇒ Object Also known as: stop
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 |
# File 'lib/rufus/scheduler.rb', line 514 def shutdown(opt=nil) opts = case opt when Symbol then { opt => true } when Hash then opt else {} end @jobs.unschedule_all if opts[:wait] || opts[:join] join_shutdown(opts) elsif opts[:kill] kill_shutdown(opts) else regular_shutdown(opts) end @work_queue.clear unlock @thread.join unless @thread == Thread.current end |
#threads ⇒ Object
Lists all the threads associated with this scheduler.
382 383 384 385 |
# File 'lib/rufus/scheduler.rb', line 382 def threads Thread.list.select { |t| t[thread_key] } end |
#timeline(time0, time1) ⇒ Object
434 435 436 437 |
# File 'lib/rufus/scheduler.rb', line 434 def timeline(time0, time1) occurrences(time0, time1, :timeline) end |
#unlock ⇒ Object
Sister method to #lock, is called when the scheduler shuts down.
353 354 355 356 357 |
# File 'lib/rufus/scheduler.rb', line 353 def unlock @trigger_lock.unlock @scheduler_lock.unlock end |
#unschedule(job_or_job_id) ⇒ Object
261 262 263 264 265 266 267 268 |
# File 'lib/rufus/scheduler.rb', line 261 def unschedule(job_or_job_id) job, job_id = fetch(job_or_job_id) fail ArgumentError.new("no job found with id '#{job_id}'") unless job job.unschedule if job end |
#up? ⇒ Boolean
157 158 159 160 |
# File 'lib/rufus/scheduler.rb', line 157 def up? !! @started_at end |
#uptime ⇒ Object
123 124 125 126 |
# File 'lib/rufus/scheduler.rb', line 123 def uptime @started_at ? EoTime.now - @started_at : nil end |
#uptime_s ⇒ Object
133 134 135 136 |
# File 'lib/rufus/scheduler.rb', line 133 def uptime_s uptime ? self.class.to_duration(uptime) : '' end |
#utc_to_s(t = Time.now) ⇒ Object
Produces the UTC string representation of a Time instance
like “2009/11/23 11:11:50.947109 UTC”
181 182 183 |
# File 'lib/rufus/scheduler/util.rb', line 181 def utc_to_s(t=Time.now) "#{t.dup.utc.strftime('%F %T.%6N')} UTC" end |
#work_threads(query = :all) ⇒ Object
Lists all the work threads (the ones actually running the scheduled block code)
Accepts a query option, which can be set to:
-
:all (default), returns all the threads that are work threads or are currently running a job
-
:active, returns all threads that are currently running a job
-
:vacant, returns the threads that are not running a job
If, thanks to :blocking => true, a job is scheduled to monopolize the main scheduler thread, that thread will get returned when :active or :all.
400 401 402 403 404 405 406 407 408 409 |
# File 'lib/rufus/scheduler.rb', line 400 def work_threads(query=:all) ts = threads.select { |t| t[:rufus_scheduler_work_thread] } case query when :active then ts.select { |t| t[:rufus_scheduler_job] } when :vacant then ts.reject { |t| t[:rufus_scheduler_job] } else ts end end |