Module: ResqueScheduler
- Included in:
- Resque
- Defined in:
- lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb
Overview
Extend Resque::Server to add tabs
Defined Under Namespace
Modules: Server
Constant Summary collapse
- Version =
'1.10.14'
Instance Method Summary collapse
- #count_all_scheduled_jobs ⇒ Object
-
#delayed_push(timestamp, item) ⇒ Object
Used internally to stuff the item into the schedule sorted list.
-
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count.
-
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule.
-
#delayed_timestamp_peek(timestamp, start, count) ⇒ Object
Returns an array of delayed items for the given timestamp.
-
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule.
-
#enqueue_at(timestamp, klass, *args) ⇒ Object
This method is nearly identical to
enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. -
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.
-
#get_schedule(name) ⇒ Object
retrive the schedule configuration for the given name.
-
#get_schedules ⇒ Object
gets the schedule as it exists in redis.
-
#next_delayed_timestamp(at_time = nil) ⇒ Object
Returns the next delayed queue timestamp (don’t call directly).
-
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done.
-
#reload_schedule! ⇒ Object
reloads the schedule from redis.
-
#remove_delayed(klass, *args) ⇒ Object
given an encoded item, remove it from the delayed_queue.
-
#remove_schedule(name) ⇒ Object
remove a given schedule by name.
-
#reset_delayed_queue ⇒ Object
Clears all jobs created with enqueue_at or enqueue_in.
-
#schedule ⇒ Object
Returns the schedule hash.
-
#schedule=(schedule_hash) ⇒ Object
Accepts a new schedule configuration of the form:.
-
#set_schedule(name, config) ⇒ Object
create or update a schedule with the provided name and configuration.
Instance Method Details
#count_all_scheduled_jobs ⇒ Object
176 177 178 179 180 181 182 |
# File 'lib/resque_scheduler.rb', line 176 def count_all_scheduled_jobs total_jobs = 0 Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do || total_jobs += redis.llen("delayed:#{}").to_i end total_jobs end |
#delayed_push(timestamp, item) ⇒ Object
Used internally to stuff the item into the schedule sorted list. timestamp
can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false
101 102 103 104 105 106 107 108 109 |
# File 'lib/resque_scheduler.rb', line 101 def delayed_push(, item) # First add this item to the list for this timestamp redis.rpush("delayed:#{.to_i}", encode(item)) # Now, add this timestamp to the zsets. The score and the value are # the same since we'll be querying by timestamp, and we don't have # anything else to store. redis.zadd :delayed_queue_schedule, .to_i, .to_i end |
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count
112 113 114 |
# File 'lib/resque_scheduler.rb', line 112 def delayed_queue_peek(start, count) Array(redis.zrange(:delayed_queue_schedule, start, start+count)).collect{|x| x.to_i} end |
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule
117 118 119 |
# File 'lib/resque_scheduler.rb', line 117 def delayed_queue_schedule_size redis.zcard :delayed_queue_schedule end |
#delayed_timestamp_peek(timestamp, start, count) ⇒ Object
Returns an array of delayed items for the given timestamp
127 128 129 130 131 132 133 134 |
# File 'lib/resque_scheduler.rb', line 127 def (, start, count) if 1 == count r = list_range "delayed:#{.to_i}", start, count r.nil? ? [] : [r] else list_range "delayed:#{.to_i}", start, count end end |
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule
122 123 124 |
# File 'lib/resque_scheduler.rb', line 122 def () redis.llen("delayed:#{.to_i}").to_i end |
#enqueue_at(timestamp, klass, *args) ⇒ Object
This method is nearly identical to enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.
87 88 89 |
# File 'lib/resque_scheduler.rb', line 87 def enqueue_at(, klass, *args) delayed_push(, job_to_hash(klass, args)) end |
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.
93 94 95 |
# File 'lib/resque_scheduler.rb', line 93 def enqueue_in(number_of_seconds_from_now, klass, *args) enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) end |
#get_schedule(name) ⇒ Object
retrive the schedule configuration for the given name
73 74 75 |
# File 'lib/resque_scheduler.rb', line 73 def get_schedule(name) decode(redis.hget(:schedules, name)) end |
#get_schedules ⇒ Object
gets the schedule as it exists in redis
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/resque_scheduler.rb', line 50 def get_schedules if redis.exists(:schedules) redis.hgetall(:schedules).tap do |h| h.each do |name, config| h[name] = decode(config) end end else nil end end |
#next_delayed_timestamp(at_time = nil) ⇒ Object
Returns the next delayed queue timestamp (don’t call directly)
138 139 140 141 142 |
# File 'lib/resque_scheduler.rb', line 138 def (at_time=nil) items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1] = items.nil? ? nil : Array(items).first .to_i unless .nil? end |
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp
can either be in seconds or a datetime
147 148 149 150 151 152 153 154 155 |
# File 'lib/resque_scheduler.rb', line 147 def () key = "delayed:#{.to_i}" item = decode redis.lpop(key) # If the list is empty, remove it. (key, ) item end |
#reload_schedule! ⇒ Object
reloads the schedule from redis
45 46 47 |
# File 'lib/resque_scheduler.rb', line 45 def reload_schedule! @schedule = get_schedules end |
#remove_delayed(klass, *args) ⇒ Object
given an encoded item, remove it from the delayed_queue
167 168 169 170 171 172 173 174 |
# File 'lib/resque_scheduler.rb', line 167 def remove_delayed(klass, *args) destroyed = 0 search = encode(job_to_hash(klass, args)) Array(redis.keys("delayed:*")).each do |key| destroyed += redis.lrem key, 0, search end destroyed end |
#remove_schedule(name) ⇒ Object
remove a given schedule by name
78 79 80 81 |
# File 'lib/resque_scheduler.rb', line 78 def remove_schedule(name) redis.hdel(:schedules, name) redis.sadd(:schedules_changed, name) end |
#reset_delayed_queue ⇒ Object
Clears all jobs created with enqueue_at or enqueue_in
158 159 160 161 162 163 164 |
# File 'lib/resque_scheduler.rb', line 158 def reset_delayed_queue Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item| redis.del "delayed:#{item}" end redis.del :delayed_queue_schedule end |
#schedule ⇒ Object
Returns the schedule hash
40 41 42 |
# File 'lib/resque_scheduler.rb', line 40 def schedule @schedule ||= {} end |
#schedule=(schedule_hash) ⇒ Object
Accepts a new schedule configuration of the form:
{some_name => {"cron" => "5/* * * *",
"class" => DoSomeWork,
"args" => "work on this string",
"description" => "this thing works it"s butter off"},
...}
:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :every can be used in lieu of :cron. see rufus-scheduler’s ‘every’ usage for
valid syntax. If :cron is present it will take precedence over :every.
:class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed
in a params. (optional)
:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional) :description is just that, a description of the job (optional). If params is
an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.
30 31 32 33 34 35 36 37 |
# File 'lib/resque_scheduler.rb', line 30 def schedule=(schedule_hash) if Resque::Scheduler.dynamic schedule_hash.each do |name, job_spec| set_schedule(name, job_spec) end end @schedule = schedule_hash end |
#set_schedule(name, config) ⇒ Object
create or update a schedule with the provided name and configuration
63 64 65 66 67 68 69 70 |
# File 'lib/resque_scheduler.rb', line 63 def set_schedule(name, config) existing_config = get_schedule(name) unless existing_config && existing_config == config redis.hset(:schedules, name, encode(config)) redis.sadd(:schedules_changed, name) end config end |