Module: ResqueScheduler
- Included in:
- Resque
- Defined in:
- lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb
Overview
Extend Resque::Server to add tabs
Defined Under Namespace
Modules: Server
Constant Summary collapse
- Version =
'1.0.12'
Instance Method Summary collapse
-
#delayed_push(timestamp, item) ⇒ Object
Used internally to stuff the item into the schedule sorted list.
-
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count.
-
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule.
-
#delayed_timestamp_peek(timestamp, start, count) ⇒ Object
Returns an array of delayed items for the given timestamp.
-
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule.
-
#enqueue_at(timestamp, klass, *args) ⇒ Object
This method is nearly identical to
enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. -
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.
-
#next_delayed_timestamp ⇒ Object
Returns the next delayed queue timestamp (don’t call directly).
-
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done.
-
#schedule ⇒ Object
Returns the schedule hash.
-
#schedule=(schedule_hash) ⇒ Object
Accepts a new schedule configuration of the form:.
Instance Method Details
#delayed_push(timestamp, item) ⇒ Object
Used internally to stuff the item into the schedule sorted list. timestamp
can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false
54 55 56 57 58 59 60 61 62 |
# File 'lib/resque_scheduler.rb', line 54 def delayed_push(, item) # First add this item to the list for this timestamp redis.rpush("delayed:#{.to_i}", encode(item)) # Now, add this timestamp to the zsets. The score and the value are # the same since we'll be querying by timestamp, and we don't have # anything else to store. redis.zadd :delayed_queue_schedule, .to_i, .to_i end |
#delayed_queue_peek(start, count) ⇒ Object
Returns an array of timestamps based on start and count
65 66 67 |
# File 'lib/resque_scheduler.rb', line 65 def delayed_queue_peek(start, count) redis.zrange(:delayed_queue_schedule, start, start+count).collect(&:to_i) end |
#delayed_queue_schedule_size ⇒ Object
Returns the size of the delayed queue schedule
70 71 72 |
# File 'lib/resque_scheduler.rb', line 70 def delayed_queue_schedule_size redis.zcard :delayed_queue_schedule end |
#delayed_timestamp_peek(timestamp, start, count) ⇒ Object
Returns an array of delayed items for the given timestamp
80 81 82 83 84 85 86 87 |
# File 'lib/resque_scheduler.rb', line 80 def (, start, count) if 1 == count r = list_range "delayed:#{.to_i}", start, count r.nil? ? [] : [r] else list_range "delayed:#{.to_i}", start, count end end |
#delayed_timestamp_size(timestamp) ⇒ Object
Returns the number of jobs for a given timestamp in the delayed queue schedule
75 76 77 |
# File 'lib/resque_scheduler.rb', line 75 def () redis.llen("delayed:#{.to_i}").to_i end |
#enqueue_at(timestamp, klass, *args) ⇒ Object
This method is nearly identical to enqueue
only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.
40 41 42 |
# File 'lib/resque_scheduler.rb', line 40 def enqueue_at(, klass, *args) delayed_push(, :class => klass.to_s, :args => args, :queue => queue_from_class(klass)) end |
#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object
Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.
46 47 48 |
# File 'lib/resque_scheduler.rb', line 46 def enqueue_in(number_of_seconds_from_now, klass, *args) enqueue_at(Time.now + number_of_seconds_from_now, klass, *args) end |
#next_delayed_timestamp ⇒ Object
Returns the next delayed queue timestamp (don’t call directly)
91 92 93 94 |
# File 'lib/resque_scheduler.rb', line 91 def = redis.zrangebyscore(:delayed_queue_schedule, '-inf', Time.now.to_i, 'limit', 0, 1).first .to_i unless .nil? end |
#next_item_for_timestamp(timestamp) ⇒ Object
Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp
can either be in seconds or a datetime
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/resque_scheduler.rb', line 99 def () key = "delayed:#{.to_i}" item = decode redis.lpop(key) # If the list is empty, remove it. if 0 == redis.llen(key).to_i redis.del key redis.zrem :delayed_queue_schedule, .to_i end item end |
#schedule ⇒ Object
Returns the schedule hash
32 33 34 |
# File 'lib/resque_scheduler.rb', line 32 def schedule YAML.load(redis[:resque_schedule_hash]) end |
#schedule=(schedule_hash) ⇒ Object
Accepts a new schedule configuration of the form:
{some_name => {"cron" => "5/* * * *",
"class" => DoSomeWork,
"args" => "work on this string",
"description" => "this thing works it"s butter off"},
...}
:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed
in a params. (optional)
:description is just that, a description of the job (optional). If params is
an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.
27 28 29 |
# File 'lib/resque_scheduler.rb', line 27 def schedule=(schedule_hash) redis[:resque_schedule_hash] = schedule_hash.to_yaml end |