Module: ResqueScheduler

Included in:
Resque
Defined in:
lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/cli.rb,
lib/resque_scheduler/util.rb,
lib/resque_scheduler/plugin.rb,
lib/resque_scheduler/version.rb,
lib/resque_scheduler/logger_builder.rb

Overview

Extend Resque::Server to add tabs

Defined Under Namespace

Modules: Plugin, Server Classes: Cli, LoggerBuilder, Util

Constant Summary collapse

VERSION =
'2.5.5'

Instance Method Summary collapse

Instance Method Details

#clean_schedulesObject

clean the schedules as it exists in redis, useful for first setup?



93
94
95
96
97
98
99
100
101
# File 'lib/resque_scheduler.rb', line 93

def clean_schedules
  if redis.exists(:schedules)
    redis.hkeys(:schedules).each do |key|
      remove_schedule(key) if !schedule_persisted?(key)
    end
  end
  @schedule = nil
  true
end

#count_all_scheduled_jobsObject



333
334
335
336
337
338
339
# File 'lib/resque_scheduler.rb', line 333

def count_all_scheduled_jobs
  total_jobs = 0
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
    total_jobs += redis.llen("delayed:#{timestamp}").to_i
  end
  total_jobs
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false



194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/resque_scheduler.rb', line 194

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Store the timestamps at with this item occurs
  redis.sadd("timestamps:#{encode(item)}", "delayed:#{timestamp.to_i}")

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



208
209
210
# File 'lib/resque_scheduler.rb', line 208

def delayed_queue_peek(start, count)
  Array(redis.zrange(:delayed_queue_schedule, start, start+count-1)).collect { |x| x.to_i }
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



213
214
215
# File 'lib/resque_scheduler.rb', line 213

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



223
224
225
226
227
228
229
230
# File 'lib/resque_scheduler.rb', line 223

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



218
219
220
# File 'lib/resque_scheduler.rb', line 218

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.



149
150
151
152
# File 'lib/resque_scheduler.rb', line 149

def enqueue_at(timestamp, klass, *args)
  validate(klass)
  enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args)
end

#enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object

Identical to enqueue_at, except you can also specify a queue in which the job will be placed after the timestamp has passed. It respects Resque.inline option, by creating the job right away instead of adding to the queue.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/resque_scheduler.rb', line 158

def enqueue_at_with_queue(queue, timestamp, klass, *args)
  return false unless Plugin.run_before_schedule_hooks(klass, *args)

  if Resque.inline? || timestamp.to_i < Time.now.to_i
    # Just create the job and let resque perform it right away with inline.
    # If the class is a custom job class, call self#scheduled on it. This allows you to do things like
    # Resque.enqueue_at(timestamp, CustomJobClass, :opt1 => val1). Otherwise, pass off to Resque.
    if klass.respond_to?(:scheduled)
      klass.scheduled(queue, klass.to_s(), *args)
    else
      Resque::Job.create(queue, klass, *args)
    end
  else
    delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
  end

  Plugin.run_after_schedule_hooks(klass, *args)
end

#enqueue_delayed(klass, *args) ⇒ Object

Given an encoded item, enqueue it now



285
286
287
288
# File 'lib/resque_scheduler.rb', line 285

def enqueue_delayed(klass, *args)
  hash = job_to_hash(klass, args)
  remove_delayed(klass, *args).times { Resque::Scheduler.enqueue_from_config(hash) }
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.



179
180
181
# File 'lib/resque_scheduler.rb', line 179

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_in, except you can also specify a queue in which the job will be placed after the number of seconds has passed.



186
187
188
# File 'lib/resque_scheduler.rb', line 186

def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args)
  enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, klass, *args)
end

#get_schedule(name) ⇒ Object

retrive the schedule configuration for the given name



128
129
130
# File 'lib/resque_scheduler.rb', line 128

def get_schedule(name)
  decode(redis.hget(:schedules, name))
end

#get_schedulesObject

gets the schedules as it exists in redis



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/resque_scheduler.rb', line 80

def get_schedules
  unless redis.exists(:schedules)
    return nil
  end

  redis.hgetall(:schedules).tap do |h|
    h.each do |name, config|
      h[name] = decode(config)
    end
  end
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don’t call directly)



234
235
236
237
238
# File 'lib/resque_scheduler.rb', line 234

def next_delayed_timestamp(at_time=nil)
  items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1]
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



243
244
245
246
247
248
249
250
251
252
253
# File 'lib/resque_scheduler.rb', line 243

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  encoded_item = redis.lpop(key)
  redis.srem("timestamps:#{encoded_item}", key)
  item = decode(encoded_item)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#reload_schedule!Object

reloads the schedule from redis



75
76
77
# File 'lib/resque_scheduler.rb', line 75

def reload_schedule!
  @schedule = get_schedules
end

#remove_delayed(klass, *args) ⇒ Object

Given an encoded item, remove it from the delayed_queue



270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/resque_scheduler.rb', line 270

def remove_delayed(klass, *args)
  search = encode(job_to_hash(klass, args))
  timestamps = redis.smembers("timestamps:#{search}")

  replies = redis.pipelined do
    timestamps.each do |key|
      redis.lrem(key, 0, search)
      redis.srem("timestamps:#{search}", key)
    end
  end

  (replies.nil? || replies.empty?) ? 0 : replies.each_slice(2).collect { |slice| slice.first }.inject(:+)
end

#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object

Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.

O(N) where N is the number of jobs scheduled to fire at the given timestamp



322
323
324
325
326
327
328
329
330
331
# File 'lib/resque_scheduler.rb', line 322

def remove_delayed_job_from_timestamp(timestamp, klass, *args)
  key = "delayed:#{timestamp.to_i}"
  encoded_job = encode(job_to_hash(klass, args))

  redis.srem("timestamps:#{encoded_job}", key)
  count = redis.lrem(key, 0, encoded_job)
  clean_up_timestamp(key, timestamp)

  count
end

#remove_delayed_selectionObject

Given a block, remove jobs that return true from a block

This allows for removal of delayed jobs that have arguments matching certain criteria



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/resque_scheduler.rb', line 293

def remove_delayed_selection
  fail ArgumentError, "Please supply a block" unless block_given?

  destroyed = 0
  # There is no way to search Redis list entries for a partial match, so we query for all
  # delayed job tasks and do our matching after decoding the payload data
  jobs = Resque.redis.keys("delayed:*")
  jobs.each do |job|
    index = Resque.redis.llen(job) - 1
    while index >= 0
      payload = Resque.redis.lindex(job, index)
      decoded_payload = decode(payload)
      if yield(decoded_payload['args'])
        removed = redis.lrem job, 0, payload
        destroyed += removed
        index -= removed
      else
        index -= 1
      end
    end
  end
  destroyed
end

#remove_schedule(name) ⇒ Object

remove a given schedule by name



137
138
139
140
141
142
143
# File 'lib/resque_scheduler.rb', line 137

def remove_schedule(name)
  redis.pipelined do
    redis.hdel(:schedules, name)
    redis.srem(:persisted_schedules, name)
    redis.sadd(:schedules_changed, name)
  end
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/resque_scheduler.rb', line 256

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    key = "delayed:#{item}"
    items = redis.lrange(key, 0, -1)
    redis.pipelined do
      items.each { |ts_item| redis.del("timestamps:#{ts_item}") }
    end
    redis.del key
  end

  redis.del :delayed_queue_schedule
end

#scheduleObject

Returns the schedule hash



66
67
68
69
70
71
72
# File 'lib/resque_scheduler.rb', line 66

def schedule
  @schedule ||= get_schedules
  if @schedule.nil?
    return {}
  end
  @schedule
end

#schedule=(schedule_hash) ⇒ Object

Accepts a new schedule configuration of the form:

{
  "MakeTea" => {
    "every" => "1m" },
  "some_name" => {
    "cron"        => "5/* * * *",
    "class"       => "DoSomeWork",
    "args"        => "work on this string",
    "description" => "this thing works it"s butter off" },
  ...
}

Hash keys can be anything and are used to describe and reference the scheduled job. If the “class” argument is missing, the key is used implicitly as “class” argument - in the “MakeTea” example, “MakeTea” is used both as job name and resque worker class.

Any jobs that were in the old schedule, but are not present in the new schedule, will be removed.

:cron can be any cron scheduling string

:every can be used in lieu of :cron. see rufus-scheduler’s ‘every’ usage for valid syntax. If :cron is present it will take precedence over :every.

:class must be a resque worker class. If it is missing, the job name (hash key) will be used as :class.

:args can be any yaml which will be converted to a ruby literal and passed in a params. (optional)

:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional)

:description is just that, a description of the job (optional). If params is an array, each element in the array is passed as a separate param, otherwise params is passed in as the only parameter to perform.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/resque_scheduler.rb', line 50

def schedule=(schedule_hash)
  # clean the schedules as it exists in redis
  clean_schedules

  schedule_hash = prepare_schedule(schedule_hash)

  # store all schedules in redis, so we can retrieve them back everywhere.
  schedule_hash.each do |name, job_spec|
    set_schedule(name, job_spec)
  end

  # ensure only return the successfully saved data!
  reload_schedule!
end

#schedule_persisted?(name) ⇒ Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/resque_scheduler.rb', line 132

def schedule_persisted?(name)
  redis.sismember(:persisted_schedules, name)
end

#scheduled_at(klass, *args) ⇒ Object

Returns delayed jobs schedule timestamp for klass, args.



342
343
344
345
346
347
# File 'lib/resque_scheduler.rb', line 342

def scheduled_at(klass, *args)
  search = encode(job_to_hash(klass, args))
  redis.smembers("timestamps:#{search}").collect do |key|
    key.tr('delayed:', '').to_i
  end
end

#set_schedule(name, config) ⇒ Object

Create or update a schedule with the provided name and configuration.

Note: values for class and custom_job_class need to be strings, not constants.

Resque.set_schedule('some_job', {:class => 'SomeJob',
                                 :every => '15mins',
                                 :queue => 'high',
                                 :args => '/tmp/poop'})


112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/resque_scheduler.rb', line 112

def set_schedule(name, config)
  existing_config = get_schedule(name)
  persist = config.delete(:persist) || config.delete('persist')
  unless existing_config && existing_config == config
    redis.pipelined do
      redis.hset(:schedules, name, encode(config))
      redis.sadd(:schedules_changed, name)
      if persist
        redis.sadd(:persisted_schedules, name)
      end
    end
  end
  config
end