Class: Redis::Deque
- Inherits:
-
Object
- Object
- Redis::Deque
- Defined in:
- lib/redis/deque.rb
Constant Summary collapse
- VERSION =
'0.2.0'
Class Method Summary collapse
Instance Method Summary collapse
- #clear(clear_process_queue = false) ⇒ Object
- #commit(message) ⇒ Object
- #commit_all ⇒ Object
- #commit_last ⇒ Object
- #empty? ⇒ Boolean
-
#initialize(queue_name, options = {}) ⇒ Deque
constructor
A new instance of Deque.
- #length ⇒ Object (also: #size)
- #pop(non_block = false) ⇒ Object (also: #dec, #shift)
- #process(non_block = false, timeout = nil) ⇒ Object
- #push(obj) ⇒ Object (also: #enc, #<<)
- #refill ⇒ Object
- #unshift(obj) ⇒ Object
Constructor Details
#initialize(queue_name, options = {}) ⇒ Deque
Returns a new instance of Deque.
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/redis/deque.rb', line 11 def initialize(queue_name, = {}) raise ArgumentError, 'queue_name must be a non-empty string' if !queue_name.is_a?(String) || queue_name.empty? raise ArgumentError, 'process_queue_name must be a non-empty string' if .key?(:process_queue_name) && (![:process_queue_name].is_a?(String) || [:process_queue_name].empty?) raise ArgumentError, 'queue_name and process_queue_name must be different' if [:process_queue_name] == queue_name @redis = [:redis] || Redis.current @queue_name = queue_name @process_queue_name = [:process_queue_name] || "#{queue_name}_process" @last_message = nil @timeout = [:timeout] ||= 0 end |
Class Method Details
.version ⇒ Object
7 8 9 |
# File 'lib/redis/deque.rb', line 7 def self.version "redis-deque version #{VERSION}" end |
Instance Method Details
#clear(clear_process_queue = false) ⇒ Object
27 28 29 30 |
# File 'lib/redis/deque.rb', line 27 def clear(clear_process_queue = false) @redis.del @queue_name @redis.del @process_queue_name if clear_process_queue end |
#commit(message) ⇒ Object
53 54 55 |
# File 'lib/redis/deque.rb', line 53 def commit() @redis.lrem(@process_queue_name, 0, ) end |
#commit_all ⇒ Object
61 62 63 |
# File 'lib/redis/deque.rb', line 61 def commit_all @redis.del @process_queue_name end |
#commit_last ⇒ Object
57 58 59 |
# File 'lib/redis/deque.rb', line 57 def commit_last commit @last_message end |
#empty? ⇒ Boolean
32 33 34 |
# File 'lib/redis/deque.rb', line 32 def empty? length <= 0 end |
#length ⇒ Object Also known as: size
23 24 25 |
# File 'lib/redis/deque.rb', line 23 def length @redis.llen @queue_name end |
#pop(non_block = false) ⇒ Object Also known as: dec, shift
44 45 46 47 48 49 50 51 |
# File 'lib/redis/deque.rb', line 44 def pop(non_block = false) @last_message = if non_block @redis.rpoplpush(@queue_name, @process_queue_name) else @redis.brpoplpush(@queue_name, @process_queue_name, @timeout) end @last_message end |
#process(non_block = false, timeout = nil) ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/redis/deque.rb', line 65 def process(non_block = false, timeout = nil) @timeout = timeout unless timeout.nil? loop do = pop(non_block) ret = yield if block_given? commit_last if ret break if .nil? || (non_block && empty?) end end |
#push(obj) ⇒ Object Also known as: enc, <<
36 37 38 |
# File 'lib/redis/deque.rb', line 36 def push(obj) @redis.lpush(@queue_name, obj) end |
#refill ⇒ Object
75 76 77 78 79 80 |
# File 'lib/redis/deque.rb', line 75 def refill while ( = @redis.lpop(@process_queue_name)) unshift() end true end |
#unshift(obj) ⇒ Object
40 41 42 |
# File 'lib/redis/deque.rb', line 40 def unshift(obj) @redis.rpush(@queue_name, obj) end |