Class: RedisQueue
- Inherits:
-
Object
- Object
- RedisQueue
- Defined in:
- lib/redis_queue.rb
Defined Under Namespace
Classes: RedisConnection
Constant Summary collapse
- PUSH_CODE =
''" if ARGV[3] == 'true' then local insert = redis.call('linsert', ARGV[1], 'before', '', ARGV[2]) if insert == -1 or insert == 0 then redis.call('lpush', ARGV[1], '') redis.call('lpush', ARGV[1], ARGV[2]) end else redis.call('rpush', ARGV[1], ARGV[2]) end"''.freeze
- SCRIPTS =
{ push: PUSH_CODE, nonblpop: ''" local message = '' while message == '' do message = redis.call('lpop', ARGV[1]) end if message then redis.call('hset', ARGV[1]..'_in_use', message, ARGV[2]) end return message "'', touch: ''" local message = '' while message == '' do message = redis.call('lpop', ARGV[1]) end if message then redis.call('rpush', ARGV[1], message) end return message "'', repush: ''" #{PUSH_CODE} redis.call('hdel', ARGV[1]..'_in_use', ARGV[2]) "'', fail: ''" redis.call('hset', ARGV[1]..'_failed', ARGV[2], ARGV[3]) redis.call('hdel', ARGV[1]..'_in_use', ARGV[2]) "'', done: ''" redis.call('hset', ARGV[1]..'_done', ARGV[2], ARGV[3]) redis.call('hdel', ARGV[1]..'_in_use', ARGV[2]) "'', unpop: ''" redis.call('lpush', ARGV[1], ARGV[2]) redis.call('hdel', ARGV[1]..'_in_use', ARGV[2]) "'', init_from: ''" local vals = redis.call('hkeys', ARGV[2]) for i = 1, table.getn(vals) do local timestamp = redis.call('hget', ARGV[2], vals[i]) if tonumber(timestamp) < tonumber(ARGV[3]) then redis.call('lpush', ARGV[1], vals[i]) redis.call('hdel', ARGV[2], vals[i]) end end "'' }.freeze
Instance Method Summary collapse
- #clear ⇒ Object
- #done(message) ⇒ Object
- #done_list ⇒ Object
- #done_size ⇒ Object
- #fail(message) ⇒ Object
- #failed_list ⇒ Object
- #failed_size ⇒ Object
- #forget(message) ⇒ Object
- #in_use_list ⇒ Object
- #in_use_size ⇒ Object
-
#initialize(args = {}) ⇒ RedisQueue
constructor
A new instance of RedisQueue.
- #list ⇒ Object
- #pop(block: true) ⇒ Object
- #print_contents ⇒ Object
- #print_stats ⇒ Object
- #push(message, priority = false) ⇒ Object
- #remove(message) ⇒ Object
- #repush(message, priority = false) ⇒ Object
- #reset(older_than: nil) ⇒ Object
- #restart ⇒ Object
- #size ⇒ Object
- #touch(block: true) ⇒ Object
- #unpop(message) ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ RedisQueue
Returns a new instance of RedisQueue.
63 64 65 66 67 68 69 |
# File 'lib/redis_queue.rb', line 63 def initialize(args = {}) args = { id: :messages, url: 'redis://localhost:6379/0' }.merge(args) @id = args.delete(:id) @redis = RedisConnection.new(args) @redis_blocking = RedisConnection.new(args) load_scripts end |
Instance Method Details
#clear ⇒ Object
167 168 169 170 171 172 173 174 |
# File 'lib/redis_queue.rb', line 167 def clear @redis.run do |redis| redis.del @id redis.del "#{@id}_in_use" redis.del "#{@id}_done" redis.del "#{@id}_failed" end end |
#done(message) ⇒ Object
86 87 88 |
# File 'lib/redis_queue.rb', line 86 def done() script :done, @id, , now end |
#done_list ⇒ Object
141 142 143 |
# File 'lib/redis_queue.rb', line 141 def done_list Hash[@redis.run { |redis| redis.hgetall "#{@id}_done" }] end |
#done_size ⇒ Object
125 126 127 |
# File 'lib/redis_queue.rb', line 125 def done_size @redis.run { |redis| redis.hlen "#{@id}_done" }.to_i end |
#fail(message) ⇒ Object
82 83 84 |
# File 'lib/redis_queue.rb', line 82 def fail() script :fail, @id, , now end |
#failed_list ⇒ Object
145 146 147 |
# File 'lib/redis_queue.rb', line 145 def failed_list Hash[@redis.run { |redis| redis.hgetall "#{@id}_failed" }] end |
#failed_size ⇒ Object
129 130 131 |
# File 'lib/redis_queue.rb', line 129 def failed_size @redis.run { |redis| redis.hlen "#{@id}_failed" }.to_i end |
#forget(message) ⇒ Object
98 99 100 |
# File 'lib/redis_queue.rb', line 98 def forget() @redis.run { |redis| redis.hdel "#{@id}_in_use", } end |
#in_use_list ⇒ Object
149 150 151 |
# File 'lib/redis_queue.rb', line 149 def in_use_list Hash[@redis.run { |redis| redis.hgetall "#{@id}_in_use" }] end |
#in_use_size ⇒ Object
133 134 135 |
# File 'lib/redis_queue.rb', line 133 def in_use_size @redis.run { |redis| redis.hlen "#{@id}_in_use" }.to_i end |
#list ⇒ Object
137 138 139 |
# File 'lib/redis_queue.rb', line 137 def list @redis.run { |redis| redis.lrange @id, 0, -1 } end |
#pop(block: true) ⇒ Object
71 72 73 74 75 76 |
# File 'lib/redis_queue.rb', line 71 def pop(block: true) return nonblpop unless block = blpop @redis.run { |redis| redis.hset "#{@id}_in_use", , now } if end |
#print_contents ⇒ Object
160 161 162 163 164 165 |
# File 'lib/redis_queue.rb', line 160 def print_contents puts "#{@id} enqueued: #{list}" puts "#{@id} in use: #{in_use_list}" puts "#{@id} failed: #{failed_list}" puts "#{@id} done: #{done_list}" end |
#print_stats ⇒ Object
153 154 155 156 157 158 |
# File 'lib/redis_queue.rb', line 153 def print_stats puts "#{@id} enqueued: #{size}" puts "#{@id} in use: #{in_use_size}" puts "#{@id} failed: #{failed_size}" puts "#{@id} done: #{done_size}" end |
#push(message, priority = false) ⇒ Object
78 79 80 |
# File 'lib/redis_queue.rb', line 78 def push(, priority = false) script :push, @id, , priority end |
#remove(message) ⇒ Object
102 103 104 |
# File 'lib/redis_queue.rb', line 102 def remove() @redis.run { |redis| redis.lrem @id, 0, } end |
#repush(message, priority = false) ⇒ Object
94 95 96 |
# File 'lib/redis_queue.rb', line 94 def repush(, priority = false) script :repush, @id, , priority end |
#reset(older_than: nil) ⇒ Object
113 114 115 |
# File 'lib/redis_queue.rb', line 113 def reset(older_than: nil) init_from "#{@id}_in_use", older_than end |
#restart ⇒ Object
117 118 119 |
# File 'lib/redis_queue.rb', line 117 def restart init_from "#{@id}_done" end |
#size ⇒ Object
121 122 123 |
# File 'lib/redis_queue.rb', line 121 def size @redis.run { |redis| redis.llen @id }.to_i end |
#touch(block: true) ⇒ Object
106 107 108 109 110 111 |
# File 'lib/redis_queue.rb', line 106 def touch(block: true) return nonbltouch unless block = blpop push() end |
#unpop(message) ⇒ Object
90 91 92 |
# File 'lib/redis_queue.rb', line 90 def unpop() script :unpop, @id, end |