Class: DelayQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/delay_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(redis, queue_name) ⇒ DelayQueue

Returns a new instance of DelayQueue.



3
4
5
6
7
# File 'lib/delay_queue.rb', line 3

def initialize(redis, queue_name)
  @redis = redis
  @queue_name = queue_name
  @lock_name = 'lock.' + @queue_name
end

Instance Method Details

#acquire_lockObject

:nodoc:



60
61
62
# File 'lib/delay_queue.rb', line 60

def acquire_lock # :nodoc:
  @redis.setnx(@lock_name, new_lock_expiration)
end

#break_lockObject

:nodoc:



68
69
70
71
# File 'lib/delay_queue.rb', line 68

def break_lock # :nodoc:
  previous = @redis.getset(@lock_name, new_lock_expiration)
  previous.nil? || Time.at(previous.to_i) <= Time.now
end

#delete(item) ⇒ Object



13
14
15
# File 'lib/delay_queue.rb', line 13

def delete(item)
  @redis.zrem(@queue_name, item)
end

#delete_all!Object



17
18
19
# File 'lib/delay_queue.rb', line 17

def delete_all!
  @redis.del(@queue_name)
end

#dequeueObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/delay_queue.rb', line 45

def dequeue
  if acquire_lock || break_lock
    array = @redis.zrangebyscore(@queue_name, 0, Time.now.to_i, :limit => [0, 1])
    item = array.first if array
    @redis.zrem(@queue_name, item) if item
    release_lock
    item
  else # couldn't acquire or break the lock. wait and try again
    # a small sleep value is actually faster than no sleep value, presumably because no
    # delay puts too much stress on Redis
    sleep 0.01
    dequeue
  end
end

#empty?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/delay_queue.rb', line 21

def empty?
  size == 0
end

#enqueue(item, options = { :delay => 0}) ⇒ Object

Enqueue a unique item with an optional delay

:item

A string

:options

An optional hash of one of the following options

:until

Time before which not to allow this item to be dequeued

:delay

Number of seconds to wait before allowing this to be dequeued



36
37
38
39
40
41
42
43
# File 'lib/delay_queue.rb', line 36

def enqueue(item, options={ :delay => 0})
  if options[:delay]
    time = Time.now + options[:delay]
  elsif options[:until]
    time = options[:until]
  end
  @redis.zadd(@queue_name, time.to_i, item)
end

#include?(item) ⇒ Boolean

Returns:

  • (Boolean)


9
10
11
# File 'lib/delay_queue.rb', line 9

def include?(item)
  @redis.zscore(@queue_name, item)
end

#release_lockObject

:nodoc:



64
65
66
# File 'lib/delay_queue.rb', line 64

def release_lock # :nodoc:
  @redis.del(@lock_name)
end

#sizeObject



25
26
27
# File 'lib/delay_queue.rb', line 25

def size
  @redis.zcard(@queue_name)
end