Class: Redis::Deque

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/deque.rb

Constant Summary collapse

VERSION =
'0.2.0'

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, options = {}) ⇒ Deque

Returns a new instance of Deque.

Raises:

  • (ArgumentError)


11
12
13
14
15
16
17
18
19
20
21
# File 'lib/redis/deque.rb', line 11

def initialize(queue_name, options = {})
  raise ArgumentError, 'queue_name must be a non-empty string'  if !queue_name.is_a?(String) || queue_name.empty?
  raise ArgumentError, 'process_queue_name must be a non-empty string' if options.key?(:process_queue_name) && (!options[:process_queue_name].is_a?(String) || options[:process_queue_name].empty?)
  raise ArgumentError, 'queue_name and process_queue_name must be different' if options[:process_queue_name] == queue_name

  @redis = options[:redis] || Redis.current
  @queue_name = queue_name
  @process_queue_name = options[:process_queue_name] || "#{queue_name}_process"
  @last_message = nil
  @timeout = options[:timeout] ||= 0
end

Class Method Details

.versionObject



7
8
9
# File 'lib/redis/deque.rb', line 7

def self.version
  "redis-deque version #{VERSION}"
end

Instance Method Details

#clear(clear_process_queue = false) ⇒ Object



27
28
29
30
# File 'lib/redis/deque.rb', line 27

def clear(clear_process_queue = false)
  @redis.del @queue_name
  @redis.del @process_queue_name if clear_process_queue
end

#commit(message) ⇒ Object



53
54
55
# File 'lib/redis/deque.rb', line 53

def commit(message)
  @redis.lrem(@process_queue_name, 0, message)
end

#commit_allObject



61
62
63
# File 'lib/redis/deque.rb', line 61

def commit_all
  @redis.del @process_queue_name
end

#commit_lastObject



57
58
59
# File 'lib/redis/deque.rb', line 57

def commit_last
  commit @last_message
end

#empty?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/redis/deque.rb', line 32

def empty?
  length <= 0
end

#lengthObject Also known as: size



23
24
25
# File 'lib/redis/deque.rb', line 23

def length
  @redis.llen @queue_name
end

#pop(non_block = false) ⇒ Object Also known as: dec, shift



44
45
46
47
48
49
50
51
# File 'lib/redis/deque.rb', line 44

def pop(non_block = false)
  @last_message = if non_block
                    @redis.rpoplpush(@queue_name, @process_queue_name)
                  else
                    @redis.brpoplpush(@queue_name, @process_queue_name, @timeout)
                  end
  @last_message
end

#process(non_block = false, timeout = nil) ⇒ Object



65
66
67
68
69
70
71
72
73
# File 'lib/redis/deque.rb', line 65

def process(non_block = false, timeout = nil)
  @timeout = timeout unless timeout.nil?
  loop do
    message = pop(non_block)
    ret = yield message if block_given?
    commit_last if ret
    break if message.nil? || (non_block && empty?)
  end
end

#push(obj) ⇒ Object Also known as: enc, <<



36
37
38
# File 'lib/redis/deque.rb', line 36

def push(obj)
  @redis.lpush(@queue_name, obj)
end

#refillObject



75
76
77
78
79
80
# File 'lib/redis/deque.rb', line 75

def refill
  while (message = @redis.lpop(@process_queue_name))
    unshift(message)
  end
  true
end

#unshift(obj) ⇒ Object



40
41
42
# File 'lib/redis/deque.rb', line 40

def unshift(obj)
  @redis.rpush(@queue_name, obj)
end