Class: ParallelQueue
- Inherits:
-
Object
- Object
- ParallelQueue
- Defined in:
- lib/parallel_queue.rb
Instance Method Summary collapse
-
#acquire_lock ⇒ Object
:nodoc:.
-
#break_lock ⇒ Object
:nodoc:.
- #delete_all! ⇒ Object
- #delete_queue(id) ⇒ Object
- #dequeue ⇒ Object
- #dequeue_each(&block) ⇒ Object
- #empty? ⇒ Boolean
-
#enqueue(id, item) ⇒ Object
:item
-
A string.
-
#initialize(redis, queue_name, options = {}) ⇒ ParallelQueue
constructor
A new instance of ParallelQueue.
- #queue_count ⇒ Object
-
#release_lock ⇒ Object
:nodoc:.
Constructor Details
#initialize(redis, queue_name, options = {}) ⇒ ParallelQueue
Returns a new instance of ParallelQueue.
3 4 5 6 7 8 9 |
# File 'lib/parallel_queue.rb', line 3 def initialize(redis, queue_name, = {}) @redis = redis @queue_name = queue_name @maxlength = [:maxlength] || nil @lock_name = 'lock.' + @queue_name @current_queue_index = 0 end |
Instance Method Details
#acquire_lock ⇒ Object
:nodoc:
101 102 103 |
# File 'lib/parallel_queue.rb', line 101 def acquire_lock # :nodoc: @redis.setnx(@lock_name, new_lock_expiration) end |
#break_lock ⇒ Object
:nodoc:
109 110 111 112 |
# File 'lib/parallel_queue.rb', line 109 def break_lock # :nodoc: previous = @redis.getset(@lock_name, new_lock_expiration) previous.nil? || Time.at(previous.to_i) <= Time.now end |
#delete_all! ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/parallel_queue.rb', line 84 def delete_all! if acquire_lock || break_lock while !empty? delete_queue(@redis.lpop(list_of_queue_names)) end @redis.del(list_of_queue_names) self.current_queue_index = 0 release_lock else # couldn't acquire or break the lock. wait and try again # a small sleep value is actually faster than no sleep value, presumably because no # delay puts too much stress on Redis sleep 0.01 delete_all! end end |
#delete_queue(id) ⇒ Object
12 13 14 15 |
# File 'lib/parallel_queue.rb', line 12 def delete_queue(id) @redis.lrem(list_of_queue_names, 1, id) @redis.del(queue_from_id(id)) end |
#dequeue ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/parallel_queue.rb', line 35 def dequeue if acquire_lock || break_lock current_id = @redis.lindex(list_of_queue_names, current_queue_index) # pop from the current queue current_queue = queue_from_id(current_id) item = @redis.lpop(current_queue) delete_queue(current_id) if @redis.llen(current_queue) == 0 increment_current_queue_index release_lock item else # couldn't acquire or break the lock. wait and try again # A small sleep value is actually faster than no sleep value, presumably because no # delay puts too much stress on Redis # Experiment: # started dequeue_demo.rb in two terminals # started enqueue_demo.rb in a third terminal # enqueue_demo.rb is set to run until it enqueues 40,000 times # dequeue_demo.rb times its run, starting from the first time there is data to # dequeue, and running through until all data have been dequeued # times reported are the average of both dequeue terminals (which consitently ended # within 0.1 second of one another) # Sleep Delay Run Duration (in seconds) # 0.01 22.6 # 0.001 22.3 # no sleep 25.1 # 0.001 23.8 # 0.001 23.8 # 0.01 22.6 # 0.01 22.7 # no sleep 25.1 # no sleep 25.0 # sleep 0.01 dequeue end end |
#dequeue_each(&block) ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/parallel_queue.rb', line 74 def dequeue_each(&block) return if queue_count == 0 self.current_queue_index = 0 begin item = dequeue yield(item) unless item.nil? end while current_queue_index > 0 && current_queue_index < queue_count end |
#empty? ⇒ Boolean
17 18 19 |
# File 'lib/parallel_queue.rb', line 17 def empty? queue_count == 0 end |
#enqueue(id, item) ⇒ Object
:item
-
A string
27 28 29 30 31 32 33 |
# File 'lib/parallel_queue.rb', line 27 def enqueue(id, item) queue = queue_from_id(id) @redis.rpush(queue, item) @redis.ltrim(queue, -@maxlength, - 1) if @maxlength @redis.lrem(list_of_queue_names, 1, id) @redis.rpush(list_of_queue_names, id) end |
#queue_count ⇒ Object
21 22 23 |
# File 'lib/parallel_queue.rb', line 21 def queue_count @redis.llen(list_of_queue_names) end |
#release_lock ⇒ Object
:nodoc:
105 106 107 |
# File 'lib/parallel_queue.rb', line 105 def release_lock # :nodoc: @redis.del(@lock_name) end |