Module: ResqueScheduler

Included in:
Resque
Defined in:
lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb

Overview

Extend Resque::Server to add tabs

Defined Under Namespace

Modules: Server

Constant Summary collapse

Version =
'1.10.10'

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject



180
181
182
183
184
185
186
# File 'lib/resque_scheduler.rb', line 180

def count_all_scheduled_jobs
  total_jobs = 0 
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
    total_jobs += redis.llen("delayed:#{timestamp}").to_i
  end 
  total_jobs
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false



105
106
107
108
109
110
111
112
113
# File 'lib/resque_scheduler.rb', line 105

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



116
117
118
# File 'lib/resque_scheduler.rb', line 116

def delayed_queue_peek(start, count)
  Array(redis.zrange(:delayed_queue_schedule, start, start+count)).collect{|x| x.to_i}
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



121
122
123
# File 'lib/resque_scheduler.rb', line 121

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



131
132
133
134
135
136
137
138
# File 'lib/resque_scheduler.rb', line 131

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



126
127
128
# File 'lib/resque_scheduler.rb', line 126

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.



91
92
93
# File 'lib/resque_scheduler.rb', line 91

def enqueue_at(timestamp, klass, *args)
  delayed_push(timestamp, job_to_hash(klass, args))
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.



97
98
99
# File 'lib/resque_scheduler.rb', line 97

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#get_schedule(name) ⇒ Object

retrive the schedule configuration for the given name



78
79
80
# File 'lib/resque_scheduler.rb', line 78

def get_schedule(name)
  decode(redis.hget(:schedules, name))
end

#get_schedulesObject

gets the schedule as it exists in redis



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/resque_scheduler.rb', line 51

def get_schedules
  if redis.exists(:schedules)
    redis.hgetall(:schedules).tap do |h|
      h.each do |name, config|
        h[name] = decode(config)
      end
    end
  else
    nil
  end
end

#mark_schedules_as_updatedObject



73
74
75
# File 'lib/resque_scheduler.rb', line 73

def mark_schedules_as_updated
  redis.del(:schedules_updated)
end

#needs_updating?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/resque_scheduler.rb', line 69

def needs_updating?
  redis.get(:schedules_updated) ? true : false
end

#next_delayed_timestampObject

Returns the next delayed queue timestamp (don’t call directly)



142
143
144
145
146
# File 'lib/resque_scheduler.rb', line 142

def next_delayed_timestamp
  items = redis.zrangebyscore :delayed_queue_schedule, '-inf', Time.now.to_i, :limit => [0, 1]
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



151
152
153
154
155
156
157
158
159
# File 'lib/resque_scheduler.rb', line 151

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  item = decode redis.lpop(key)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#reload_schedule!Object

reloads the schedule from redis



46
47
48
# File 'lib/resque_scheduler.rb', line 46

def reload_schedule!
  @schedule = get_schedules
end

#remove_delayed(klass, *args) ⇒ Object

given an encoded item, remove it from the delayed_queue



171
172
173
174
175
176
177
178
# File 'lib/resque_scheduler.rb', line 171

def remove_delayed(klass, *args)
  destroyed = 0
  search = encode(job_to_hash(klass, args))
  Array(redis.keys("delayed:*")).each do |key|
    destroyed += redis.lrem key, 0, search
  end
  destroyed
end

#remove_schedule(name) ⇒ Object

remove a given schedule by name



83
84
85
# File 'lib/resque_scheduler.rb', line 83

def remove_schedule(name)
  redis.hdel(:schedules, name)
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



162
163
164
165
166
167
168
# File 'lib/resque_scheduler.rb', line 162

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    redis.del "delayed:#{item}"
  end

  redis.del :delayed_queue_schedule
end

#scheduleObject

Returns the schedule hash



41
42
43
# File 'lib/resque_scheduler.rb', line 41

def schedule
  get_schedules || {}
end

#schedule=(schedule_hash) ⇒ Object

Convenience method for placing a schedule configuration into the redis server

Accepts a new schedule configuration of the form:

{some_name => {"cron" => "5/* * * *",
               "class" => DoSomeWork,
               "args" => "work on this string",
               "description" => "this thing works it"s butter off"},
 ...}

:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed

in a params. (optional)

:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional) :description is just that, a description of the job (optional). If params is

an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.


31
32
33
34
35
36
37
38
# File 'lib/resque_scheduler.rb', line 31

def schedule=(schedule_hash)
  #@schedule = schedule_hash

  # put all the jobs from a YAML file into the schedules hash
  schedule_hash.each do |name, job_spec|
    set_schedule(name, job_spec)
  end
end

#set_schedule(name, config) ⇒ Object

create or update a schedule with the provided name and configuration



64
65
66
67
# File 'lib/resque_scheduler.rb', line 64

def set_schedule(name, config)
  redis.hset(:schedules, name, encode(config))
  redis.set(:schedules_updated, Time.now.to_s)
end