Class: RedisScheduler

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/redis-scheduler.rb

Overview

A simple, production-ready chronological scheduler for Redis.

Use #schedule! to add an item to be processed at an arbitrary point in time. The item will be converted to a string and returned to you as such.

Use #each to iterate over those items at the scheduled time. This call iterates over all items that are scheduled on or before the current time, in chronological order. In blocking mode, this call will wait forever until such items become available, and will never terminate. In non-blocking mode, this call will only iterate over ready items and will terminate when there are no items ready for processing.

Use #items to simply iterate over all items in the queue, for debugging purposes.

Exceptions during processing

Any exceptions during #each will result in the item being re-added to the schedule at the original time.

Multiple producers and consumers

Multiple producers and consumers are fine.

Concurrent reads and writes

Concurrent reads and writes are fine.

Segfaults

The scheduler maintains a “processing set” of items currently being processed. If a process dies (i.e. not as a result of a Ruby exception, but as the result of a segfault), the item will remain in this set but will not longer appear in the schedule. To avoid losing scheduled work due to segfaults, you must periodically iterate through this set and recover any items that have been abandoned, using #processing_set_items. Setting a proper 'descriptor' argument in #each is suggested.

Defined Under Namespace

Classes: InvalidEntryException, ItemEnumerator

Constant Summary collapse

POLL_DELAY =

seconds

1.0
CAS_DELAY =

seconds

0.5

Instance Method Summary collapse

Constructor Details

#initialize(redis, opts = {}) ⇒ RedisScheduler

Options:

  • namespace: prefix for Redis keys, e.g. “scheduler/”.

  • blocking: whether #each should block or return immediately if there are items to be processed immediately.

  • uniq: when false (default), the same item can be scheduled for multiple times. When true, scheduling the same item multiple times only updates its scheduled time, and does not represent the item multiple times in the schedule.

Note that uniq is set on a per-schedule basis and cannot be changed. Once a uniq schedule is created, it is forever uniq (until #reset! is called, at least). Attempts to use non-uniq queues in a uniq manner, or vice versa, will result in undefined behavior (probably errors).

Note also that nonblocking mode may still actually block momentarily as part of the check-and-set semantics, i.e. block during contention from multiple clients. “Nonblocking” refers to whether the scheduler should wait until events in the schedule are ready, or only return those items that are ready currently.


59
60
61
62
63
64
65
66
67
68
# File 'lib/redis-scheduler.rb', line 59

def initialize redis, opts={}
  @redis = redis
  @namespace = opts[:namespace]
  @blocking = opts[:blocking]
  @uniq = opts[:uniq]

  @queue = [@namespace, "q"].join
  @processing_set = [@namespace, "processing"].join
  @counter = [@namespace, "counter"].join
end

Instance Method Details

#each(descriptor = nil) ⇒ Object

Yields items along with their scheduled times. Only returns items on or after their scheduled times. Items are returned as strings. If @blocking is false, will stop once there are no more items that can be processed immediately; if it's true, will wait until items become available (and never terminate).

Descriptor is an optional string that will be associated with this item while in the processing set. This is useful for providing whatever information you need to determine whether the item needs to be recovered when iterating through the processing set.


96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/redis-scheduler.rb', line 96

def each descriptor=nil
  while(x = get(descriptor))
    item, processing_descriptor, at = x
    begin
      yield item, at
    rescue Exception # back in the hole!
      schedule! item, at
      raise
    ensure
      cleanup! processing_descriptor
    end
  end
end

#itemsObject

Returns an Enumerable of [item, scheduled time] pairs, which can be used to iterate over all the items in the queue, in order of earliest- to latest-scheduled, regardless of the schedule time.

Note that this view is not synchronized with write operations, and thus may be inconsistent (e.g. return duplicates, miss items, etc) if changes to the schedule happen while iterating.

For these reasons, this is mainly useful for debugging purposes.


119
# File 'lib/redis-scheduler.rb', line 119

def items; ItemEnumerator.new(@redis, @queue, @uniq) end

#processing_set_itemsObject

Returns an Array of [item, timestamp, descriptor] tuples representing the set of in-process items. The timestamp corresponds to the time at which the item was removed from the schedule for processing.


124
125
126
127
128
129
# File 'lib/redis-scheduler.rb', line 124

def processing_set_items
  @redis.smembers(@processing_set).map do |x|
    item, timestamp, descriptor = Marshal.load(x)
    [item, Time.at(timestamp), descriptor]
  end
end

#processing_set_sizeObject

Returns the total number of items currently being processed.


84
# File 'lib/redis-scheduler.rb', line 84

def processing_set_size; @redis.scard @processing_set end

#reset!Object

Drop all data and reset the schedule entirely.


76
77
78
# File 'lib/redis-scheduler.rb', line 76

def reset!
  [@queue, @processing_set, @counter].each { |k| @redis.del k }
end

#schedule!(item, time) ⇒ Object

Schedule an item at a specific time. item will be converted to a string.


71
72
73
# File 'lib/redis-scheduler.rb', line 71

def schedule! item, time
  @redis.zadd @queue, time.to_f, make_entry(item)
end

#sizeObject

Return the total number of items in the schedule.


81
# File 'lib/redis-scheduler.rb', line 81

def size; @redis.zcard @queue end