Module: ResqueScheduler
- Included in:
- Resque
- Defined in:
- lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb,
lib/resque_scheduler/search_delayed.rb
Overview
Extend Resque::Server to add tabs
Defined Under Namespace
Modules: Server
Constant Summary collapse
- Version =
'2.0.3'
Instance Method Summary collapse
- #count_all_scheduled_jobs ⇒ Object
-
#delayed_push(timestamp, item) ⇒ Object
Used internally to stuff the item into the schedule sorted list.
- #delayed_queue ⇒ Object
-
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count.
-
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule.
-
#delayed_timestamp_peek(timestamp, start, count) ⇒ Object
Returns an array of delayed items for the given timestamp.
-
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule.
-
#enqueue_at(timestamp, klass, *args) ⇒ Object
This method is nearly identical to
enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. -
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.
-
#get_schedule(name) ⇒ Object
retrieve the schedule configuration for the given name.
-
#get_schedules ⇒ Object
gets the schedule as it exists in mongo.
-
#next_delayed_timestamp(at_time = nil) ⇒ Object
Returns the next delayed queue timestamp (don’t call directly).
-
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done.
- #pop_schedules_changed ⇒ Object
-
#reload_schedule! ⇒ Object
reloads the schedule from mongo.
-
#remove_delayed(klass, *args) ⇒ Object
given an encoded item, remove it from the delayed_queue does not clean like
next_item_for_timestamp
TODO ? unlike resque-scheduler, it does not return the number of removed items, can’t use find_and_modify because it only updates one item. -
#remove_schedule(name) ⇒ Object
remove a given schedule by name.
-
#reset_delayed_queue ⇒ Object
Clears all jobs created with enqueue_at or enqueue_in.
-
#schedule ⇒ Object
Returns the schedule hash.
-
#schedule=(schedule_hash) ⇒ Object
Accepts a new schedule configuration of the form:.
- #schedules ⇒ Object
- #schedules_changed ⇒ Object
- #search_delayed(query, start = 0, count = 1) ⇒ Object
- #search_delayed_count ⇒ Object
-
#set_schedule(name, config) ⇒ Object
create or update a schedule with the provided name and configuration.
Instance Method Details
#count_all_scheduled_jobs ⇒ Object
216 217 218 219 220 221 222 |
# File 'lib/resque_scheduler.rb', line 216 def count_all_scheduled_jobs total_jobs = 0 delayed_queue.find.each do |doc| total_jobs += (doc['items'] || []).size end total_jobs end |
#delayed_push(timestamp, item) ⇒ Object
Used internally to stuff the item into the schedule sorted list. timestamp
can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false
133 134 135 136 137 138 139 140 141 142 |
# File 'lib/resque_scheduler.rb', line 133 def delayed_push(, item) # Add this item to the list for this timestamp doc = delayed_queue.find_and_modify( :query => {'_id' => .to_i}, :update => {'$push' => {:items => item}}, :upsert => true, :new => true ) doc['items'].size end |
#delayed_queue ⇒ Object
22 23 24 25 |
# File 'lib/resque_scheduler.rb', line 22 def delayed_queue self.mongo ||= ENV['MONGO'] || 'localhost:27017' @delayed_queue ||= @db.collection('delayed_queue') end |
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count
145 146 147 |
# File 'lib/resque_scheduler.rb', line 145 def delayed_queue_peek(start, count) delayed_queue.find({}, :skip => start, :limit => count, :fields => '_id', :sort => ['_id', 1]).map {|d| d['_id']} end |
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule
150 151 152 |
# File 'lib/resque_scheduler.rb', line 150 def delayed_queue_schedule_size delayed_queue.count end |
#delayed_timestamp_peek(timestamp, start, count) ⇒ Object
Returns an array of delayed items for the given timestamp
161 162 163 164 165 166 167 |
# File 'lib/resque_scheduler.rb', line 161 def (, start, count) doc = delayed_queue.find_one( {'_id' => .to_i}, :fields => {'items' => {'$slice' => [start, count]}} ) doc ? doc['items'] || [] : [] end |
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule
155 156 157 158 |
# File 'lib/resque_scheduler.rb', line 155 def () document = delayed_queue.find_one('_id' => .to_i) document ? (document['items'] || []).size : 0 end |
#enqueue_at(timestamp, klass, *args) ⇒ Object
This method is nearly identical to enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.
117 118 119 |
# File 'lib/resque_scheduler.rb', line 117 def enqueue_at(, klass, *args) delayed_push(, job_to_hash(klass, args)) end |
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.
124 125 126 |
# File 'lib/resque_scheduler.rb', line 124 def enqueue_in(number_of_seconds_from_now, klass, *args) enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) end |
#get_schedule(name) ⇒ Object
retrieve the schedule configuration for the given name
90 91 92 93 94 |
# File 'lib/resque_scheduler.rb', line 90 def get_schedule(name) schedule = schedules.find_one('_id' => name) schedule.delete('_id') if schedule schedule end |
#get_schedules ⇒ Object
gets the schedule as it exists in mongo
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/resque_scheduler.rb', line 67 def get_schedules if schedules.count > 0 h = {} schedules.find.each do |a| h[a.delete('_id')] = a end h else nil end end |
#next_delayed_timestamp(at_time = nil) ⇒ Object
Returns the next delayed queue timestamp (don’t call directly)
171 172 173 174 175 176 177 |
# File 'lib/resque_scheduler.rb', line 171 def (at_time=nil) doc = delayed_queue.find_one( {'_id' => {'$lte' => (at_time || Time.now).to_i}}, :sort => ['_id', Mongo::ASCENDING] ) doc ? doc['_id'] : nil end |
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp
can either be in seconds or a datetime
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/resque_scheduler.rb', line 182 def () # Returns the array of items before it was shifted doc = delayed_queue.find_and_modify( :query => {'_id' => .to_i}, :update => {'$pop' => {'items' => -1}} # -1 means shift ) item = doc['items'].first # If the list is empty, remove it. () item rescue Mongo::OperationFailure # Database command 'findandmodify' failed: {"errmsg"=>"No matching object found", "ok"=>0.0} nil end |
#pop_schedules_changed ⇒ Object
102 103 104 105 106 107 108 109 110 |
# File 'lib/resque_scheduler.rb', line 102 def pop_schedules_changed while doc = schedules_changed.find_and_modify(:remove => true) yield doc['_id'] end rescue Mongo::OperationFailure # "Database command 'findandmodify' failed: {"errmsg"=>"No matching object found", "ok"=>0.0}" # Sadly, the mongo driver raises (with a global exception class) instead of returning nil when # the collection is empty. end |
#reload_schedule! ⇒ Object
reloads the schedule from mongo
62 63 64 |
# File 'lib/resque_scheduler.rb', line 62 def reload_schedule! @schedule = get_schedules end |
#remove_delayed(klass, *args) ⇒ Object
given an encoded item, remove it from the delayed_queue does not clean like next_item_for_timestamp
TODO ? unlike resque-scheduler, it does not return the number of removed items, can’t use find_and_modify because it only updates one item.
208 209 210 211 212 213 214 |
# File 'lib/resque_scheduler.rb', line 208 def remove_delayed(klass, *args) delayed_queue.update( {}, {'$pull' => {'items' => job_to_hash(klass, args)}}, :multi => true ) end |
#remove_schedule(name) ⇒ Object
remove a given schedule by name
97 98 99 100 |
# File 'lib/resque_scheduler.rb', line 97 def remove_schedule(name) schedules.remove('_id' => name) schedules_changed.insert('_id' => name) end |
#reset_delayed_queue ⇒ Object
Clears all jobs created with enqueue_at or enqueue_in
200 201 202 |
# File 'lib/resque_scheduler.rb', line 200 def reset_delayed_queue delayed_queue.remove end |
#schedule ⇒ Object
Returns the schedule hash
57 58 59 |
# File 'lib/resque_scheduler.rb', line 57 def schedule @schedule ||= {} end |
#schedule=(schedule_hash) ⇒ Object
Accepts a new schedule configuration of the form:
{some_name => {"cron" => "5/* * * *",
"class" => DoSomeWork,
"args" => "work on this string",
"description" => "this thing works it"s butter off"},
...}
:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :every can be used in lieu of :cron. see rufus-scheduler’s ‘every’ usage for
valid syntax. If :cron is present it will take precedence over :every.
:class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed
in a params. (optional)
:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional) :description is just that, a description of the job (optional). If params is
an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.
47 48 49 50 51 52 53 54 |
# File 'lib/resque_scheduler.rb', line 47 def schedule=(schedule_hash) if Resque::Scheduler.dynamic schedule_hash.each do |name, job_spec| set_schedule(name, job_spec) end end @schedule = schedule_hash end |
#schedules ⇒ Object
12 13 14 15 |
# File 'lib/resque_scheduler.rb', line 12 def schedules self.mongo ||= ENV['MONGO'] || 'localhost:27017' @schedules ||= @db.collection('schedules') end |
#schedules_changed ⇒ Object
17 18 19 20 |
# File 'lib/resque_scheduler.rb', line 17 def schedules_changed self.mongo ||= ENV['MONGO'] || 'localhost:27017' @schedules_changed ||= @db.collection('schedules_changed') end |
#search_delayed(query, start = 0, count = 1) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/resque_scheduler/search_delayed.rb', line 7 def search_delayed(query, start = 0, count = 1) if query.nil? || query.empty? @@search_results = [] return [] end start, count = [start, count].map { |n| Integer(n) } set_results = Set.new # For each search term, retrieve the failed jobs that contain at least one relevant field matching the regexp defined by that search term query.split.each do |term| partial_results = [] self.delayed_queue.find().each do |row| row['items'].each do |job| if job['class'] =~ /#{term}/i || job['queue'] =~ /#{term}/i partial_results << row['_id'] else job['args'].each do |arg| arg.each do |key, value| if key =~ /#{term}/i || value =~ /#{term}/i partial_results << row['_id'] end end end end end end # If the set was empty, merge the first results, else intersect it with the current results if set_results.empty? set_results.merge(partial_results) else set_results = set_results & partial_results end end # search_res will be an array containing 'count' values, starting with 'start', sorted in descending order @@search_results = set_results.to_a || [] search_results = set_results.to_a[start, count] search_results || [] end |
#search_delayed_count ⇒ Object
3 4 5 |
# File 'lib/resque_scheduler/search_delayed.rb', line 3 def search_delayed_count @@search_results.count end |
#set_schedule(name, config) ⇒ Object
create or update a schedule with the provided name and configuration
80 81 82 83 84 85 86 87 |
# File 'lib/resque_scheduler.rb', line 80 def set_schedule(name, config) existing_config = get_schedule(name) unless existing_config && existing_config == config schedules.insert(config.merge('_id' => name)) schedules_changed.insert('_id' => name) end config end |