Module: ResqueScheduler

Included in:
Resque
Defined in:
lib/resque_scheduler/server.rb,
lib/resque_scheduler.rb,
lib/resque_scheduler/version.rb

Overview

Extend Resque::Server to add tabs

Defined Under Namespace

Modules: Server

Constant Summary collapse

Version =
'1.10.15'

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject



176
177
178
179
180
181
182
# File 'lib/resque_scheduler.rb', line 176

def count_all_scheduled_jobs
  total_jobs = 0 
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
    total_jobs += redis.llen("delayed:#{timestamp}").to_i
  end 
  total_jobs
end

#delayed_push(timestamp, item) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false



101
102
103
104
105
106
107
108
109
# File 'lib/resque_scheduler.rb', line 101

def delayed_push(timestamp, item)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode(item))

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



112
113
114
# File 'lib/resque_scheduler.rb', line 112

def delayed_queue_peek(start, count)
  Array(redis.zrange(:delayed_queue_schedule, start, start+count)).collect{|x| x.to_i}
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



117
118
119
# File 'lib/resque_scheduler.rb', line 117

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_peek(timestamp, start, count) ⇒ Object

Returns an array of delayed items for the given timestamp



127
128
129
130
131
132
133
134
# File 'lib/resque_scheduler.rb', line 127

def delayed_timestamp_peek(timestamp, start, count)
  if 1 == count
    r = list_range "delayed:#{timestamp.to_i}", start, count
    r.nil? ? [] : [r]
  else
    list_range "delayed:#{timestamp.to_i}", start, count
  end
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



122
123
124
# File 'lib/resque_scheduler.rb', line 122

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#enqueue_at(timestamp, klass, *args) ⇒ Object

This method is nearly identical to enqueue only it also takes a timestamp which will be used to schedule the job for queueing. Until timestamp is in the past, the job will sit in the schedule list.



87
88
89
# File 'lib/resque_scheduler.rb', line 87

def enqueue_at(timestamp, klass, *args)
  delayed_push(timestamp, job_to_hash(klass, args))
end

#enqueue_in(number_of_seconds_from_now, klass, *args) ⇒ Object

Identical to enqueue_at but takes number_of_seconds_from_now instead of a timestamp.



93
94
95
# File 'lib/resque_scheduler.rb', line 93

def enqueue_in(number_of_seconds_from_now, klass, *args)
  enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end

#get_schedule(name) ⇒ Object

retrive the schedule configuration for the given name



73
74
75
# File 'lib/resque_scheduler.rb', line 73

def get_schedule(name)
  decode(redis.hget(:schedules, name))
end

#get_schedulesObject

gets the schedule as it exists in redis



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/resque_scheduler.rb', line 50

def get_schedules
  if redis.exists(:schedules)
    redis.hgetall(:schedules).tap do |h|
      h.each do |name, config|
        h[name] = decode(config)
      end
    end
  else
    nil
  end
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don’t call directly)



138
139
140
141
142
# File 'lib/resque_scheduler.rb', line 138

def next_delayed_timestamp(at_time=nil)
  items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1]
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



147
148
149
150
151
152
153
154
155
# File 'lib/resque_scheduler.rb', line 147

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  item = decode redis.lpop(key)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#reload_schedule!Object

reloads the schedule from redis



45
46
47
# File 'lib/resque_scheduler.rb', line 45

def reload_schedule!
  @schedule = get_schedules
end

#remove_delayed(klass, *args) ⇒ Object

given an encoded item, remove it from the delayed_queue



167
168
169
170
171
172
173
174
# File 'lib/resque_scheduler.rb', line 167

def remove_delayed(klass, *args)
  destroyed = 0
  search = encode(job_to_hash(klass, args))
  Array(redis.keys("delayed:*")).each do |key|
    destroyed += redis.lrem key, 0, search
  end
  destroyed
end

#remove_schedule(name) ⇒ Object

remove a given schedule by name



78
79
80
81
# File 'lib/resque_scheduler.rb', line 78

def remove_schedule(name)
  redis.hdel(:schedules, name)
  redis.sadd(:schedules_changed, name)
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



158
159
160
161
162
163
164
# File 'lib/resque_scheduler.rb', line 158

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    redis.del "delayed:#{item}"
  end

  redis.del :delayed_queue_schedule
end

#scheduleObject

Returns the schedule hash



40
41
42
# File 'lib/resque_scheduler.rb', line 40

def schedule
  @schedule ||= {}
end

#schedule=(schedule_hash) ⇒ Object

Accepts a new schedule configuration of the form:

{some_name => {"cron" => "5/* * * *",
               "class" => DoSomeWork,
               "args" => "work on this string",
               "description" => "this thing works it"s butter off"},
 ...}

:name can be anything and is used only to describe the scheduled job :cron can be any cron scheduling string :job can be any resque job class :every can be used in lieu of :cron. see rufus-scheduler’s ‘every’ usage for

valid syntax. If :cron is present it will take precedence over :every.

:class must be a resque worker class :args can be any yaml which will be converted to a ruby literal and passed

in a params. (optional)

:rails_envs is the list of envs where the job gets loaded. Envs are comma separated (optional) :description is just that, a description of the job (optional). If params is

an array, each element in the array is passed as a separate param,
otherwise params is passed in as the only parameter to perform.


30
31
32
33
34
35
36
37
# File 'lib/resque_scheduler.rb', line 30

def schedule=(schedule_hash)
  if Resque::Scheduler.dynamic
    schedule_hash.each do |name, job_spec|
      set_schedule(name, job_spec)
    end
  end
  @schedule = schedule_hash
end

#set_schedule(name, config) ⇒ Object

create or update a schedule with the provided name and configuration



63
64
65
66
67
68
69
70
# File 'lib/resque_scheduler.rb', line 63

def set_schedule(name, config)
  existing_config = get_schedule(name)
  unless existing_config && existing_config == config
    redis.hset(:schedules, name, encode(config))
    redis.sadd(:schedules_changed, name)
  end
  config
end