Class: RedisQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_queue.rb

Defined Under Namespace

Classes: RedisConnection

Constant Summary collapse

PUSH_CODE =
''"
if ARGV[3] == 'true' then
  local insert = redis.call('linsert', ARGV[1], 'before', '', ARGV[2])
  if insert == -1 or insert == 0 then
    redis.call('lpush', ARGV[1], '')
    redis.call('lpush', ARGV[1], ARGV[2])
  end
else
  redis.call('rpush', ARGV[1], ARGV[2])
end"''.freeze
SCRIPTS =
{
  push: PUSH_CODE,
  nonblpop: ''"
    local message = ''
    while message == '' do
      message = redis.call('lpop', ARGV[1])
    end
    if message then
      redis.call('hset', ARGV[1]..'_in_use', message, ARGV[2])
    end
    return message
  "'',
  touch: ''"
    local message = ''
    while message == '' do
      message = redis.call('lpop', ARGV[1])
    end
    if message then
      redis.call('rpush', ARGV[1], message)
    end
    return message
  "'',
  repush: ''"
    #{PUSH_CODE}
    redis.call('hdel', ARGV[1]..'_in_use', ARGV[2])
  "'',
  fail: ''"
    redis.call('hset', ARGV[1]..'_failed', ARGV[2], ARGV[3])
    redis.call('hdel', ARGV[1]..'_in_use', ARGV[2])
  "'',
  done: ''"
    redis.call('hset', ARGV[1]..'_done', ARGV[2], ARGV[3])
    redis.call('hdel', ARGV[1]..'_in_use', ARGV[2])
  "'',
  unpop: ''"
    redis.call('lpush', ARGV[1], ARGV[2])
    redis.call('hdel', ARGV[1]..'_in_use', ARGV[2])
  "'',
  init_from: ''"
    local vals = redis.call('hkeys', ARGV[2])
    for i = 1, table.getn(vals) do
      local timestamp = redis.call('hget', ARGV[2], vals[i])
      if tonumber(timestamp) < tonumber(ARGV[3]) then
        redis.call('lpush', ARGV[1], vals[i])
        redis.call('hdel', ARGV[2], vals[i])
      end
    end
    "''
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ RedisQueue

Returns a new instance of RedisQueue.



63
64
65
66
67
68
69
# File 'lib/redis_queue.rb', line 63

def initialize(args = {})
  args = { id: :messages, url: 'redis://localhost:6379/0' }.merge(args)
  @id             = args.delete(:id)
  @redis          = RedisConnection.new(args)
  @redis_blocking = RedisConnection.new(args)
  load_scripts
end

Instance Method Details

#clearObject



167
168
169
170
171
172
173
174
# File 'lib/redis_queue.rb', line 167

def clear
  @redis.run do |redis|
    redis.del @id
    redis.del "#{@id}_in_use"
    redis.del "#{@id}_done"
    redis.del "#{@id}_failed"
  end
end

#done(message) ⇒ Object



86
87
88
# File 'lib/redis_queue.rb', line 86

def done(message)
  script :done, @id, message, now
end

#done_listObject



141
142
143
# File 'lib/redis_queue.rb', line 141

def done_list
  Hash[@redis.run { |redis| redis.hgetall "#{@id}_done" }]
end

#done_sizeObject



125
126
127
# File 'lib/redis_queue.rb', line 125

def done_size
  @redis.run { |redis| redis.hlen "#{@id}_done" }.to_i
end

#fail(message) ⇒ Object



82
83
84
# File 'lib/redis_queue.rb', line 82

def fail(message)
  script :fail, @id, message, now
end

#failed_listObject



145
146
147
# File 'lib/redis_queue.rb', line 145

def failed_list
  Hash[@redis.run { |redis| redis.hgetall "#{@id}_failed" }]
end

#failed_sizeObject



129
130
131
# File 'lib/redis_queue.rb', line 129

def failed_size
  @redis.run { |redis| redis.hlen "#{@id}_failed" }.to_i
end

#forget(message) ⇒ Object



98
99
100
# File 'lib/redis_queue.rb', line 98

def forget(message)
  @redis.run { |redis| redis.hdel "#{@id}_in_use", message }
end

#in_use_listObject



149
150
151
# File 'lib/redis_queue.rb', line 149

def in_use_list
  Hash[@redis.run { |redis| redis.hgetall "#{@id}_in_use" }]
end

#in_use_sizeObject



133
134
135
# File 'lib/redis_queue.rb', line 133

def in_use_size
  @redis.run { |redis| redis.hlen "#{@id}_in_use" }.to_i
end

#listObject



137
138
139
# File 'lib/redis_queue.rb', line 137

def list
  @redis.run { |redis| redis.lrange @id, 0, -1 }
end

#pop(block: true) ⇒ Object



71
72
73
74
75
76
# File 'lib/redis_queue.rb', line 71

def pop(block: true)
  return nonblpop unless block
  message = blpop
  @redis.run { |redis| redis.hset "#{@id}_in_use", message, now } if message
  message
end


160
161
162
163
164
165
# File 'lib/redis_queue.rb', line 160

def print_contents
  puts "#{@id} enqueued: #{list}"
  puts "#{@id} in use:   #{in_use_list}"
  puts "#{@id} failed:   #{failed_list}"
  puts "#{@id} done:     #{done_list}"
end


153
154
155
156
157
158
# File 'lib/redis_queue.rb', line 153

def print_stats
  puts "#{@id} enqueued: #{size}"
  puts "#{@id} in use:   #{in_use_size}"
  puts "#{@id} failed:   #{failed_size}"
  puts "#{@id} done:     #{done_size}"
end

#push(message, priority = false) ⇒ Object



78
79
80
# File 'lib/redis_queue.rb', line 78

def push(message, priority = false)
  script :push, @id, message, priority
end

#remove(message) ⇒ Object



102
103
104
# File 'lib/redis_queue.rb', line 102

def remove(message)
  @redis.run { |redis| redis.lrem @id, 0, message }
end

#repush(message, priority = false) ⇒ Object



94
95
96
# File 'lib/redis_queue.rb', line 94

def repush(message, priority = false)
  script :repush, @id, message, priority
end

#reset(older_than: nil) ⇒ Object



113
114
115
# File 'lib/redis_queue.rb', line 113

def reset(older_than: nil)
  init_from "#{@id}_in_use", older_than
end

#restartObject



117
118
119
# File 'lib/redis_queue.rb', line 117

def restart
  init_from "#{@id}_done"
end

#sizeObject



121
122
123
# File 'lib/redis_queue.rb', line 121

def size
  @redis.run { |redis| redis.llen @id }.to_i
end

#touch(block: true) ⇒ Object



106
107
108
109
110
111
# File 'lib/redis_queue.rb', line 106

def touch(block: true)
  return nonbltouch unless block
  message = blpop
  push(message)
  message
end

#unpop(message) ⇒ Object



90
91
92
# File 'lib/redis_queue.rb', line 90

def unpop(message)
  script :unpop, @id, message
end