Module: ResqueScheduler

Included in:
Resque
Defined in:
lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/plugin.rb,
lib/resque_scheduler/version.rb

Overview

Extend Resque::Server to add tabs

Defined Under Namespace

Modules: Plugin, Server

Constant Summary collapse

VERSION =
'2.0.2'

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject



241
242
243
244
245
246
247
# File 'lib/resque_scheduler.rb', line 241

def count_all_scheduled_jobs
  total_jobs = 0
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
    total_jobs += redis.llen("delayed:#{timestamp}").to_i
  end
  total_jobs
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false



151
152
153
154
155
156
157
158
159
# File 'lib/resque_scheduler.rb', line 151

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



162
163
164
# File 'lib/resque_scheduler.rb', line 162

def delayed_queue_peek(start, count)
  Array(redis.zrange(:delayed_queue_schedule, start, start+count-1)).collect{|x| x.to_i}
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



167
168
169
# File 'lib/resque_scheduler.rb', line 167

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



177
178
179
180
181
182
183
184
# File 'lib/resque_scheduler.rb', line 177

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



172
173
174
# File 'lib/resque_scheduler.rb', line 172

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.



112
113
114
115
# File 'lib/resque_scheduler.rb', line 112

def enqueue_at(timestamp, klass, *args)
  validate_job!(klass)
  enqueue_at_with_queue(queue_from_class(klass), timestamp, klass, *args)
end

#enqueue_at_with_queue(queue, timestamp, klass, *args) ⇒ Object

Identical to enqueue_at, except you can also specify a queue in which the job will be placed after the timestamp has passed. It respects Resque.inline option, by creating the job right away instead of adding to the queue.



121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/resque_scheduler.rb', line 121

def enqueue_at_with_queue(queue, timestamp, klass, *args)
  return false unless Plugin.run_before_schedule_hooks(klass, *args)

  if Resque.inline?
    # Just create the job and let resque perform it right away with inline.
    Resque::Job.create(queue, klass, *args)
  else
    delayed_push(timestamp, job_to_hash_with_queue(queue, klass, args))
  end

  Plugin.run_after_schedule_hooks(klass, *args)
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.



136
137
138
# File 'lib/resque_scheduler.rb', line 136

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_in, except you can also specify a queue in which the job will be placed after the number of seconds has passed.



143
144
145
# File 'lib/resque_scheduler.rb', line 143

def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args)
  enqueue_at_with_queue(queue, Time.now + number_of_seconds_from_now, klass, *args)
end

#get_schedule(name) ⇒ Object

retrive the schedule configuration for the given name



98
99
100
# File 'lib/resque_scheduler.rb', line 98

def get_schedule(name)
  decode(redis.hget(:schedules, name))
end

#get_schedulesObject

gets the schedule as it exists in redis



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/resque_scheduler.rb', line 67

def get_schedules
  if redis.exists(:schedules)
    redis.hgetall(:schedules).tap do |h|
      h.each do |name, config|
        h[name] = decode(config)
      end
    end
  else
    nil
  end
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don’t call directly)



188
189
190
191
192
# File 'lib/resque_scheduler.rb', line 188

def next_delayed_timestamp(at_time=nil)
  items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1]
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



197
198
199
200
201
202
203
204
205
# File 'lib/resque_scheduler.rb', line 197

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  item = decode redis.lpop(key)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#reload_schedule!Object

reloads the schedule from redis



62
63
64
# File 'lib/resque_scheduler.rb', line 62

def reload_schedule!
  @schedule = get_schedules
end

#remove_delayed(klass, *args) ⇒ Object

Given an encoded item, remove it from the delayed_queue

This method is potentially very expensive since it needs to scan through the delayed queue for every timestamp.



220
221
222
223
224
225
226
227
# File 'lib/resque_scheduler.rb', line 220

def remove_delayed(klass, *args)
  destroyed = 0
  search = encode(job_to_hash(klass, args))
  Array(redis.keys("delayed:*")).each do |key|
    destroyed += redis.lrem key, 0, search
  end
  destroyed
end

#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object

Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.

O(N) where N is the number of jobs scheduled to fire at the given timestamp



234
235
236
237
238
239
# File 'lib/resque_scheduler.rb', line 234

def remove_delayed_job_from_timestamp(timestamp, klass, *args)
  key = "delayed:#{timestamp.to_i}"
  count = redis.lrem key, 0, encode(job_to_hash(klass, args))
  clean_up_timestamp(key, timestamp)
  count
end

#remove_schedule(name) ⇒ Object

remove a given schedule by name



103
104
105
106
# File 'lib/resque_scheduler.rb', line 103

def remove_schedule(name)
  redis.hdel(:schedules, name)
  redis.sadd(:schedules_changed, name)
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



208
209
210
211
212
213
214
# File 'lib/resque_scheduler.rb', line 208

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    redis.del "delayed:#{item}"
  end

  redis.del :delayed_queue_schedule
end

#scheduleObject

Returns the schedule hash



57
58
59
# File 'lib/resque_scheduler.rb', line 57

def schedule
  @schedule ||= {}
end

#schedule=(schedule_hash) ⇒ Object

Accepts a new schedule configuration of the form:

{
  "MakeTea" => {
    "every" => "1m" },
  "some_name" => {
    "cron"        => "5/* * * *",
    "class"       => "DoSomeWork",
    "args"        => "work on this string",
    "description" => "this thing works it"s butter off" },
  ...
}

Hash keys can be anything and are used to describe and reference the scheduled job. If the “class” argument is missing, the key is used implicitly as “class” argument - in the “MakeTea” example, “MakeTea” is used both as job name and resque worker class.

:cron can be any cron scheduling string

:every can be used in lieu of :cron. see rufus-scheduler’s ‘every’ usage for valid syntax. If :cron is present it will take precedence over :every.

:class must be a resque worker class. If it is missing, the job name (hash key) will be used as :class.

:args can be any yaml which will be converted to a ruby literal and passed in a params. (optional)

:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional)

:description is just that, a description of the job (optional). If params is an array, each element in the array is passed as a separate param, otherwise params is passed in as the only parameter to perform.



45
46
47
48
49
50
51
52
53
54
# File 'lib/resque_scheduler.rb', line 45

def schedule=(schedule_hash)
  schedule_hash = prepare_schedule(schedule_hash)

  if Resque::Scheduler.dynamic
    schedule_hash.each do |name, job_spec|
      set_schedule(name, job_spec)
    end
  end
  @schedule = schedule_hash
end

#set_schedule(name, config) ⇒ Object

Create or update a schedule with the provided name and configuration.

Note: values for class and custom_job_class need to be strings, not constants.

Resque.set_schedule('some_job', {:class => 'SomeJob',
                                 :every => '15mins',
                                 :queue => 'high',
                                 :args => '/tmp/poop'})


88
89
90
91
92
93
94
95
# File 'lib/resque_scheduler.rb', line 88

def set_schedule(name, config)
  existing_config = get_schedule(name)
  unless existing_config && existing_config == config
    redis.hset(:schedules, name, encode(config))
    redis.sadd(:schedules_changed, name)
  end
  config
end