Class: DelayQueue
- Inherits:
-
Object
- Object
- DelayQueue
- Defined in:
- lib/delay_queue.rb
Instance Method Summary collapse
-
#acquire_lock ⇒ Object
:nodoc:.
-
#break_lock ⇒ Object
:nodoc:.
- #delete(item) ⇒ Object
- #delete_all! ⇒ Object
- #dequeue ⇒ Object
- #empty? ⇒ Boolean
-
#enqueue(item, options = { :delay => 0}) ⇒ Object
Enqueue a unique item with an optional delay.
- #include?(item) ⇒ Boolean
-
#initialize(redis, queue_name) ⇒ DelayQueue
constructor
A new instance of DelayQueue.
-
#release_lock ⇒ Object
:nodoc:.
- #size ⇒ Object
Constructor Details
#initialize(redis, queue_name) ⇒ DelayQueue
Returns a new instance of DelayQueue.
3 4 5 6 7 |
# File 'lib/delay_queue.rb', line 3 def initialize(redis, queue_name) @redis = redis @queue_name = queue_name @lock_name = 'lock.' + @queue_name end |
Instance Method Details
#acquire_lock ⇒ Object
:nodoc:
60 61 62 |
# File 'lib/delay_queue.rb', line 60 def acquire_lock # :nodoc: @redis.setnx(@lock_name, new_lock_expiration) end |
#break_lock ⇒ Object
:nodoc:
68 69 70 71 |
# File 'lib/delay_queue.rb', line 68 def break_lock # :nodoc: previous = @redis.getset(@lock_name, new_lock_expiration) previous.nil? || Time.at(previous.to_i) <= Time.now end |
#delete(item) ⇒ Object
13 14 15 |
# File 'lib/delay_queue.rb', line 13 def delete(item) @redis.zrem(@queue_name, item) end |
#delete_all! ⇒ Object
17 18 19 |
# File 'lib/delay_queue.rb', line 17 def delete_all! @redis.del(@queue_name) end |
#dequeue ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/delay_queue.rb', line 45 def dequeue if acquire_lock || break_lock array = @redis.zrangebyscore(@queue_name, 0, Time.now.to_i, :limit => [0, 1]) item = array.first if array @redis.zrem(@queue_name, item) if item release_lock item else # couldn't acquire or break the lock. wait and try again # a small sleep value is actually faster than no sleep value, presumably because no # delay puts too much stress on Redis sleep 0.01 dequeue end end |
#empty? ⇒ Boolean
21 22 23 |
# File 'lib/delay_queue.rb', line 21 def empty? size == 0 end |
#enqueue(item, options = { :delay => 0}) ⇒ Object
Enqueue a unique item with an optional delay
:item
-
A string
:options
-
An optional hash of one of the following options
:until
-
Time before which not to allow this item to be dequeued
:delay
-
Number of seconds to wait before allowing this to be dequeued
36 37 38 39 40 41 42 43 |
# File 'lib/delay_queue.rb', line 36 def enqueue(item, ={ :delay => 0}) if [:delay] time = Time.now + [:delay] elsif [:until] time = [:until] end @redis.zadd(@queue_name, time.to_i, item) end |
#include?(item) ⇒ Boolean
9 10 11 |
# File 'lib/delay_queue.rb', line 9 def include?(item) @redis.zscore(@queue_name, item) end |
#release_lock ⇒ Object
:nodoc:
64 65 66 |
# File 'lib/delay_queue.rb', line 64 def release_lock # :nodoc: @redis.del(@lock_name) end |
#size ⇒ Object
25 26 27 |
# File 'lib/delay_queue.rb', line 25 def size @redis.zcard(@queue_name) end |