Class: Resque::Scheduler
- Inherits:
-
Object
- Object
- Resque::Scheduler
- Extended by:
- Helpers
- Defined in:
- lib/resque/scheduler.rb
Class Attribute Summary collapse
-
.mute ⇒ Object
If set, produces no output.
-
.verbose ⇒ Object
If true, logs more stuff…
Class Method Summary collapse
-
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one.
-
.enqueue_from_config(config) ⇒ Object
Enqueues a job based on a config hash.
-
.handle_delayed_items ⇒ Object
Handles queueing delayed items.
- .handle_shutdown ⇒ Object
-
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance.
- .log(msg) ⇒ Object
- .log!(msg) ⇒ Object
-
.poll_sleep ⇒ Object
Sleeps and returns true.
-
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant).
- .rufus_scheduler ⇒ Object
-
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns).
-
.shutdown ⇒ Object
Sets the shutdown flag, exits if sleeping.
Class Attribute Details
.mute ⇒ Object
If set, produces no output
24 25 26 |
# File 'lib/resque/scheduler.rb', line 24 def mute @mute end |
.verbose ⇒ Object
If true, logs more stuff…
21 22 23 |
# File 'lib/resque/scheduler.rb', line 21 def verbose @verbose end |
Class Method Details
.clear_schedule! ⇒ Object
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
120 121 122 123 124 |
# File 'lib/resque/scheduler.rb', line 120 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil rufus_scheduler end |
.enqueue_from_config(config) ⇒ Object
Enqueues a job based on a config hash
99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/resque/scheduler.rb', line 99 def enqueue_from_config(config) args = config['args'] || config[:args] klass_name = config['class'] || config[:class] params = args.nil? ? [] : Array(args) queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name)) if (config[:just_once] || config['just_once']) Resque::Job.destroy(queue, klass_name, *params) end if klass_name.include?("WithStatus") Resque::JobWithStatus.enqueue_with_queue(queue, klass_name, *params) else Resque::Job.create(queue, klass_name, *params) end end |
.handle_delayed_items ⇒ Object
Handles queueing delayed items
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/resque/scheduler.rb', line 72 def handle_delayed_items item = nil begin if = Resque. item = nil begin handle_shutdown do if item = Resque.() log "queuing #{item['class']} [delayed]" queue = item['queue'] || Resque.queue_from_class(constantize(item['class'])) Job.create(queue, item['class'], *item['args']) end end # continue processing until there are no more ready items in this timestamp end while !item.nil? end # continue processing until there are no more ready timestamps end while !.nil? end |
.handle_shutdown ⇒ Object
92 93 94 95 96 |
# File 'lib/resque/scheduler.rb', line 92 def handle_shutdown exit if @shutdown yield exit if @shutdown end |
.load_schedule! ⇒ Object
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/resque/scheduler.rb', line 55 def load_schedule! log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? Resque.schedule.each do |name, config| log! "Scheduling #{name} " if !config['cron'].nil? && config['cron'].length > 0 rufus_scheduler.cron config['cron'] do log! "queuing #{config['class']} (#{name})" enqueue_from_config(config) end else log! "no cron found for #{config['class']} (#{name}) - skipping" end end end |
.log(msg) ⇒ Object
144 145 146 147 |
# File 'lib/resque/scheduler.rb', line 144 def log(msg) # add "verbose" logic later log!(msg) if verbose end |
.log!(msg) ⇒ Object
140 141 142 |
# File 'lib/resque/scheduler.rb', line 140 def log!(msg) puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{msg}" unless mute end |
.poll_sleep ⇒ Object
Sleeps and returns true
127 128 129 130 131 132 |
# File 'lib/resque/scheduler.rb', line 127 def poll_sleep @sleeping = true handle_shutdown { sleep 5 } @sleeping = false true end |
.register_signal_handlers ⇒ Object
For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.
47 48 49 50 51 |
# File 'lib/resque/scheduler.rb', line 47 def register_signal_handlers trap("TERM") { shutdown } trap("INT") { shutdown } trap('QUIT') { shutdown } unless defined? JRUBY_VERSION end |
.rufus_scheduler ⇒ Object
114 115 116 |
# File 'lib/resque/scheduler.rb', line 114 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.start_new end |
.run ⇒ Object
Schedule all jobs and continually look for delayed jobs (never returns)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/resque/scheduler.rb', line 27 def run # trap signals register_signal_handlers # Load the schedule into rufus load_schedule! # Now start the scheduling part of the loop. loop do handle_delayed_items poll_sleep end # never gets here. end |
.shutdown ⇒ Object
Sets the shutdown flag, exits if sleeping
135 136 137 138 |
# File 'lib/resque/scheduler.rb', line 135 def shutdown @shutdown = true exit if @sleeping end |