Module: Qu::Extensions::Scheduler::Redis

Defined in:
lib/qu/extensions/scheduler/redis.rb

Instance Method Summary collapse

Instance Method Details

#count_all_scheduled_jobsObject



178
179
180
181
182
183
184
# File 'lib/qu/extensions/scheduler/redis.rb', line 178

def count_all_scheduled_jobs
  total_jobs = 0
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |timestamp|
    total_jobs += redis.llen("delayed:#{timestamp}").to_i
  end
  total_jobs
end

#delayed_push(timestamp, payload) ⇒ Object

Used internally to stuff the item into the schedule sorted list. timestamp can be either in seconds or a datetime object Insertion if O(log(n)). Returns true if it’s the first job to be scheduled at that time, else false



98
99
100
101
102
103
104
105
106
# File 'lib/qu/extensions/scheduler/redis.rb', line 98

def delayed_push(timestamp, payload)
  # First add this item to the list for this timestamp
  redis.rpush("delayed:#{timestamp.to_i}", encode('klass' => payload.klass.to_s, 'args' => payload.args))

  # Now, add this timestamp to the zsets.  The score and the value are
  # the same since we'll be querying by timestamp, and we don't have
  # anything else to store.
  redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end

#delayed_queue_peek(start, count) ⇒ Object

Returns an array of timestamps based on start and count



109
110
111
# File 'lib/qu/extensions/scheduler/redis.rb', line 109

def delayed_queue_peek(start, count)
  Array(redis.zrange(:delayed_queue_schedule, start, start+count)).collect{|x| x.to_i}
end

#delayed_queue_schedule_sizeObject

Returns the size of the delayed queue schedule



114
115
116
# File 'lib/qu/extensions/scheduler/redis.rb', line 114

def delayed_queue_schedule_size
  redis.zcard :delayed_queue_schedule
end

#delayed_timestamp_size(timestamp) ⇒ Object

Returns the number of jobs for a given timestamp in the delayed queue schedule



119
120
121
# File 'lib/qu/extensions/scheduler/redis.rb', line 119

def delayed_timestamp_size(timestamp)
  redis.llen("delayed:#{timestamp.to_i}").to_i
end

#get_schedule(name) ⇒ Object

retrieve the schedule configuration for the given name



49
50
51
# File 'lib/qu/extensions/scheduler/redis.rb', line 49

def get_schedule(name)
  decode(redis.hget(:schedules, name))
end

#get_schedulesObject

gets the schedule as it exists in redis



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/qu/extensions/scheduler/redis.rb', line 18

def get_schedules
  if redis.exists(:schedules)
    redis.hgetall(:schedules).tap do |h|
      h.each do |name, config|
        h[name] = decode(config)
      end
    end
  else
    nil
  end
end

#list_range(key, start = 0, count = 1) ⇒ Object

Does the dirty work of fetching a range of items from a Redis list and converting them into Ruby objects.



7
8
9
10
11
12
13
14
15
# File 'lib/qu/extensions/scheduler/redis.rb', line 7

def list_range(key, start = 0, count = 1)
  if count == 1
    decode redis.lindex(key, start)
  else
    Array(redis.lrange(key, start, start+count-1)).map do |item|
      decode item
    end
  end
end

#load_schedule!Object

Pulls the schedule from Qu.schedule and loads it into the rufus scheduler instance



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/qu/extensions/scheduler/redis.rb', line 77

def load_schedule!
  Qu::Scheduler.set_process_title "loading schedule"

  # Need to load the schedule from redis for the first time if dynamic
  Qu.reload_schedule! if Qu::Scheduler.dynamic

  Qu.logger.warn("Schedule empty! Set Qu.schedule") if Qu.schedule.empty?

  @@scheduled_jobs = {}

  Qu.schedule.each do |name, config|
    Qu::Scheduler.load_schedule_job(name, config)
  end
  redis.del(:schedules_changed)
  Qu::Scheduler.set_process_title "running"
end

#next_delayed_timestamp(at_time = nil) ⇒ Object

Returns the next delayed queue timestamp (don’t call directly)



125
126
127
128
129
# File 'lib/qu/extensions/scheduler/redis.rb', line 125

def next_delayed_timestamp(at_time=nil)
  items = redis.zrangebyscore :delayed_queue_schedule, '-inf', (at_time || Time.now).to_i, :limit => [0, 1]
  timestamp = items.nil? ? nil : Array(items).first
  timestamp.to_i unless timestamp.nil?
end

#next_item_for_timestamp(timestamp) ⇒ Object

Returns the next item to be processed for a given timestamp, nil if done. (don’t call directly) timestamp can either be in seconds or a datetime



134
135
136
137
138
139
140
141
142
# File 'lib/qu/extensions/scheduler/redis.rb', line 134

def next_item_for_timestamp(timestamp)
  key = "delayed:#{timestamp.to_i}"

  item = decode redis.lpop(key)

  # If the list is empty, remove it.
  clean_up_timestamp(key, timestamp)
  item
end

#remove_delayed(klass, *args) ⇒ Object

Given an encoded item, remove it from the delayed_queue

This method is potentially very expensive since it needs to scan through the delayed queue for every timestamp.



157
158
159
160
161
162
163
164
# File 'lib/qu/extensions/scheduler/redis.rb', line 157

def remove_delayed(klass, *args)
  destroyed = 0
  search = encode('klass' => klass.to_s, 'args' => args)
  Array(redis.keys("delayed:*")).each do |key|
    destroyed += redis.lrem key, 0, search
  end
  destroyed
end

#remove_delayed_job_from_timestamp(timestamp, klass, *args) ⇒ Object

Given a timestamp and job (klass + args) it removes all instances and returns the count of jobs removed.

O(N) where N is the number of jobs scheduled to fire at the given timestamp



171
172
173
174
175
176
# File 'lib/qu/extensions/scheduler/redis.rb', line 171

def remove_delayed_job_from_timestamp(timestamp, klass, *args)
  key = "delayed:#{timestamp.to_i}"
  count = redis.lrem key, 0, encode('klass' => klass.to_s, 'args' => args)
  clean_up_timestamp(key, timestamp)
  count
end

#remove_schedule(name) ⇒ Object

remove a given schedule by name



54
55
56
57
# File 'lib/qu/extensions/scheduler/redis.rb', line 54

def remove_schedule(name)
  redis.hdel(:schedules, name)
  redis.sadd(:schedules_changed, name)
end

#reset_delayed_queueObject

Clears all jobs created with enqueue_at or enqueue_in



145
146
147
148
149
150
151
# File 'lib/qu/extensions/scheduler/redis.rb', line 145

def reset_delayed_queue
  Array(redis.zrange(:delayed_queue_schedule, 0, -1)).each do |item|
    redis.del "delayed:#{item}"
  end

  redis.del :delayed_queue_schedule
end

#set_schedule(name, config) ⇒ Object

Create or update a schedule with the provided name and configuration.

Note: values for class and custom_job_class need to be strings, not constants.

Qu.set_schedule('some_job', { :class => 'SomeJob',
                              :every => '15mins',
                              :queue => 'high',
                              :args => '/tmp/poop' })


39
40
41
42
43
44
45
46
# File 'lib/qu/extensions/scheduler/redis.rb', line 39

def set_schedule(name, config)
  existing_config = get_schedule(name)
  unless existing_config && existing_config == config
    redis.hset(:schedules, name, encode(config))
    redis.sadd(:schedules_changed, name)
  end
  config
end

#update_scheduleObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/qu/extensions/scheduler/redis.rb', line 59

def update_schedule
  if redis.scard(:schedules_changed) > 0
    Qu::Scheduler.set_process_title "updating schedule"
    Qu.reload_schedule!
    while schedule_name = redis.spop(:schedules_changed)
      if Qu.schedule.keys.include?(schedule_name)
        Qu::Scheduler.unschedule_job(schedule_name)
        Qu::Scheduler.load_schedule_job(schedule_name, Qu.schedule[schedule_name])
      else
        Qu::Scheduler.unschedule_job(schedule_name)
      end
    end
    Qu::Scheduler.set_process_title "running"
  end
end