Class: Resque::Scheduler

Inherits:
Object
  • Object
show all
Extended by:
Helpers
Defined in:
lib/resque/scheduler.rb

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.airbrakeObject

If set, will tell rufus to use the airbrake exception handler



23
24
25
# File 'lib/resque/scheduler.rb', line 23

def airbrake
  @airbrake
end

.dynamicObject

If set, will try to update the schulde in the loop



20
21
22
# File 'lib/resque/scheduler.rb', line 20

def dynamic
  @dynamic
end

.muteObject

If set, produces no output



17
18
19
# File 'lib/resque/scheduler.rb', line 17

def mute
  @mute
end

.verboseObject

If true, logs more stuff…



14
15
16
# File 'lib/resque/scheduler.rb', line 14

def verbose
  @verbose
end

Class Method Details

.clear_schedule!Object

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler



186
187
188
189
190
191
# File 'lib/resque/scheduler.rb', line 186

def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  @@scheduled_jobs = {}
  rufus_scheduler
end

.enqueue_delayed_items_for_timestamp(timestamp) ⇒ Object

Enqueues all delayed jobs for a timestamp



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/resque/scheduler.rb', line 128

def enqueue_delayed_items_for_timestamp(timestamp)
  item = nil
  begin
    handle_shutdown do
      if item = Resque.next_item_for_timestamp(timestamp)
        log "queuing #{item['class']} [delayed]"
        klass = constantize(item['class'])
        queue = item['queue'] || Resque.queue_from_class(klass)
        # Support custom job classes like job with status
        if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job')
          # custom job classes not supporting the same API calls must implement the #schedule method
          constantize(job_klass).scheduled(queue, item['class'], *item['args'])
        else
          Resque.enqueue_to(queue, klass, *item['args'])
          #Resque::Job.create(queue, klass, *item['args'])
        end
      end
    end
  # continue processing until there are no more ready items in this timestamp
  end while !item.nil?
end

.enqueue_from_config(config) ⇒ Object

Enqueues a job based on a config hash



157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/resque/scheduler.rb', line 157

def enqueue_from_config(config)
  args = config['args'] || config[:args]
  klass_name = config['class'] || config[:class]
  klass = constantize(klass_name)
  params = args.nil? ? [] : Array(args)
  queue = config['queue'] || config[:queue] || Resque.queue_from_class(klass)
  # Support custom job classes like job with status
  if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job')
    # custom job classes not supporting the same API calls must implement the #schedule method
    constantize(job_klass).scheduled(queue, klass_name, *params)
  else
    Resque.enqueue_to(queue, klass, *params)
  end
end

.handle_delayed_items(at_time = nil) ⇒ Object

Handles queueing delayed items



117
118
119
120
121
122
123
124
125
# File 'lib/resque/scheduler.rb', line 117

def handle_delayed_items(at_time = nil)
  if timestamp = Resque.next_delayed_timestamp(at_time)
    procline "Processing Delayed Items"
    while !timestamp.nil?
      enqueue_delayed_items_for_timestamp(timestamp)
      timestamp = Resque.next_delayed_timestamp(at_time)
    end
  end
end

.handle_shutdownObject



150
151
152
153
154
# File 'lib/resque/scheduler.rb', line 150

def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end

.load_schedule!Object

Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance



68
69
70
71
72
73
74
75
76
77
78
# File 'lib/resque/scheduler.rb', line 68

def load_schedule!
  log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?

  @@scheduled_jobs = {}

  Resque.schedule.each do |name, config|
    load_schedule_job(name, config)
  end
  Resque.redis.del(:schedules_changed)
  procline "Schedules Loaded"
end

.load_schedule_job(name, config) ⇒ Object

Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobs



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/resque/scheduler.rb', line 81

def load_schedule_job(name, config)
  # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as
  # required for the jobs to be scheduled.  If rails_env is missing, the
  # job should be scheduled regardless of what ENV['RAILS_ENV'] is set
  # to.
  if config['rails_env'].nil? || rails_env_matches?(config)
    log! "Scheduling #{name} "
    interval_defined = false
    interval_types = %w{cron every}
    interval_types.each do |interval_type|
      if !config[interval_type].nil? && config[interval_type].length > 0
        begin
          @@scheduled_jobs[name] = rufus_scheduler.send(interval_type, config[interval_type]) do
            log! "queueing #{config['class']} (#{name})"
            enqueue_from_config(config)
          end
        rescue Exception => e
          log! "#{e.class.name}: #{e.message}"
        end
        interval_defined = true
        break
      end
    end
    unless interval_defined
      log! "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"
    end
  end
end

.log(msg) ⇒ Object



242
243
244
245
# File 'lib/resque/scheduler.rb', line 242

def log(msg)
  # add "verbose" logic later
  log!(msg) if verbose
end

.log!(msg) ⇒ Object



238
239
240
# File 'lib/resque/scheduler.rb', line 238

def log!(msg)
  puts "#{Time.now.strftime("%Y-%m-%d %H:%M:%S")} #{msg}" unless mute
end

.poll_sleepObject

Sleeps and returns true



225
226
227
228
229
230
# File 'lib/resque/scheduler.rb', line 225

def poll_sleep
  @sleeping = true
  handle_shutdown { sleep 5 }
  @sleeping = false
  true
end

.procline(string) ⇒ Object



247
248
249
250
# File 'lib/resque/scheduler.rb', line 247

def procline(string)
  $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
  log! $0
end

.rails_env_matches?(config) ⇒ Boolean

Returns true if the given schedule config hash matches the current ENV

Returns:

  • (Boolean)


112
113
114
# File 'lib/resque/scheduler.rb', line 112

def rails_env_matches?(config)
  config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])
end

.register_signal_handlersObject

For all signals, set the shutdown flag and wait for current poll/enqueing to finish (should be almost istant). In the case of sleeping, exit immediately.



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/resque/scheduler.rb', line 53

def register_signal_handlers
  trap("TERM") { shutdown }
  trap("INT") { shutdown }

  begin
    trap('QUIT') { shutdown   }
    trap('USR1') { kill_child }
    trap('USR2') { reload_schedule! }
  rescue ArgumentError
    warn "Signals QUIT and USR1 and USR2 not supported."
  end
end

.reload_schedule!Object



193
194
195
196
197
198
# File 'lib/resque/scheduler.rb', line 193

def reload_schedule!
  procline "Reloading Schedule"
  clear_schedule!
  Resque.reload_schedule!
  load_schedule!
end

.rufus_schedulerObject



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/resque/scheduler.rb', line 172

def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.start_new
  if self.airbrake
    def @rufus_scheduler.handle_exception(job, exception)
      puts "Exception caught, notifying Airbrake"
      id = Airbrake.notify(exception)
      puts "Airbrake id: #{id}"
    end
  end
  @rufus_scheduler
end

.runObject

Schedule all jobs and continually look for delayed jobs (never returns)



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/resque/scheduler.rb', line 31

def run
  $0 = "resque-scheduler: Starting"
  # trap signals
  register_signal_handlers

  # Load the schedule into rufus
  procline "Loading Schedule"
  load_schedule!

  # Now start the scheduling part of the loop.
  loop do
    handle_delayed_items
    update_schedule if dynamic
    poll_sleep
  end

  # never gets here.
end

.scheduled_jobsObject

the Rufus::Scheduler jobs that are scheduled



26
27
28
# File 'lib/resque/scheduler.rb', line 26

def scheduled_jobs
  @@scheduled_jobs
end

.shutdownObject

Sets the shutdown flag, exits if sleeping



233
234
235
236
# File 'lib/resque/scheduler.rb', line 233

def shutdown
  @shutdown = true
  exit if @sleeping
end

.unschedule_job(name) ⇒ Object



216
217
218
219
220
221
222
# File 'lib/resque/scheduler.rb', line 216

def unschedule_job(name)
  if scheduled_jobs[name]
    log "Removing schedule #{name}"
    scheduled_jobs[name].unschedule
    @@scheduled_jobs.delete(name)
  end
end

.update_scheduleObject



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/resque/scheduler.rb', line 200

def update_schedule
  if Resque.redis.scard(:schedules_changed) > 0
    procline "Updating schedule"
    Resque.reload_schedule!
    while schedule_name = Resque.redis.spop(:schedules_changed)
      if Resque.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Resque.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
  end
  procline "Schedules Loaded"
end