Module: Resque::Uniqueue

Defined in:
lib/resque/uniqueue.rb

Instance Method Summary collapse

Instance Method Details

#confirm_compatible_redis_versionObject



123
124
125
126
127
128
129
130
# File 'lib/resque/uniqueue.rb', line 123

def confirm_compatible_redis_version
  redis_version = redis.info["redis_version"]
  major, minor, patch = redis_version.split(".").map(&:to_i)
  if major < 2 || (major == 2 && minor < 6)
    #TODO raise specific exception
    raise "Redis version must be at least 2.6.0 you are running #{redis_version}"
  end
end

#confirm_unique_queue_validity(queue) ⇒ Object

if the queue and set sizes differ, something is very wrong and we should fail loudly



88
89
90
91
92
93
# File 'lib/resque/uniqueue.rb', line 88

def confirm_unique_queue_validity(queue)
  response = redis.evalsha queue_and_set_length_equal_eval_sha, [queue]
  return true if response == 1
  #TODO raise specific exception
  raise "Make sure your queues are empty before you start using uniqueue"
end

#load_script(script) ⇒ Object



83
84
85
# File 'lib/resque/uniqueue.rb', line 83

def load_script(script)
  redis.script :load, script
end

#pop(queue) ⇒ Object



10
11
12
# File 'lib/resque/uniqueue.rb', line 10

def pop(queue)
  unique_queue?(queue) ? pop_unique(queue)        : super
end

#pop_unique(queue) ⇒ Object



27
28
29
30
31
32
33
34
35
# File 'lib/resque/uniqueue.rb', line 27

def pop_unique(queue)
  queue = "queue:#{queue}"
  confirm_unique_queue_validity(queue)
  results = redis.evalsha pop_unique_eval_sha, [queue]
  return nil unless results[0]
  job = decode results[0]
  job["start_at"] ||= results[1].to_i
  return job
end

#pop_unique_eval_shaObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/resque/uniqueue.rb', line 51

def pop_unique_eval_sha
  @pop_unique_eval_sha  ||= load_script <<-LUA
    local queue_name = KEYS[1]
    local uniqueue_name = queue_name..':uniqueue'
    local start_at_name = queue_name..':start_at'
    local results = {}
    results[1] = redis.call('lpop', queue_name)
    results[2] = redis.call('lpop', start_at_name)
    if results[1] then
      redis.call('srem', uniqueue_name, results[1])
    end
    return results
  LUA
end

#push(queue, item) ⇒ Object



6
7
8
# File 'lib/resque/uniqueue.rb', line 6

def push(queue, item)
  unique_queue?(queue) ? push_unique(queue, item) : super
end

#push_unique(queue, item, time = Time.now.utc.to_i) ⇒ Object



20
21
22
23
24
25
# File 'lib/resque/uniqueue.rb', line 20

def push_unique(queue, item, time = Time.now.utc.to_i)
  watch_queue(queue)
  queue = "queue:#{queue}"

  redis.evalsha push_unique_eval_sha, [queue], [encode(item), time]
end

#push_unique_eval_shaObject



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/resque/uniqueue.rb', line 37

def push_unique_eval_sha
  @push_unique_eval_sha ||= load_script <<-LUA
    local queue_name = KEYS[1]
    local uniqueue_name = queue_name..':uniqueue'
    local start_at_name = queue_name..':start_at'
    local not_in_set = redis.call('sadd', uniqueue_name , ARGV[1])
    if not_in_set == 1 then
      redis.call('rpush', start_at_name, ARGV[2])
      return redis.call('rpush', queue_name, ARGV[1])
    end
    return false
  LUA
end

#queue_and_set_length_equal_eval_shaObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/resque/uniqueue.rb', line 66

def queue_and_set_length_equal_eval_sha
  @queue_and_set_length_equal_eval_sha ||= load_script <<-LUA
    local queue_name = KEYS[1]
    local uniqueue_name = queue_name..':uniqueue'
    local start_at_name = queue_name..':start_at'
    local queue_size = redis.call('llen', queue_name)
    local uniqueue_size = redis.call('scard', uniqueue_name)
    local start_at_size = redis.call('llen', start_at_name)
    if queue_size == uniqueue_size then
      if queue_size == start_at_size then
        return true
      end
    end
    return false
  LUA
end

#remove_queue(queue) ⇒ Object



14
15
16
17
18
# File 'lib/resque/uniqueue.rb', line 14

def remove_queue(queue)
  super(queue)
  redis.del("queue:#{queue}:uniqueue")
  redis.del("queue:#{queue}:start_at")
end

#unique_queue?(queue) ⇒ Boolean

is this queue a unique queue if you have uniqueue turned on and no queues are set, its assumes all queues are unique

Returns:

  • (Boolean)


97
98
99
100
# File 'lib/resque/uniqueue.rb', line 97

def unique_queue?(queue)
  return false unless unique_queues?
  !unique_queues || unique_queues.include?(queue)
end

#unique_queuesObject

list the unique queues



103
104
105
# File 'lib/resque/uniqueue.rb', line 103

def unique_queues
  @unique_queues
end

#unique_queues!Object

turn on unique queues



113
114
115
116
# File 'lib/resque/uniqueue.rb', line 113

def unique_queues!
  confirm_compatible_redis_version
  @unique_queues_enabled = true
end

#unique_queues=(unique_queues) ⇒ Object

set a specific list of unique queues



108
109
110
# File 'lib/resque/uniqueue.rb', line 108

def unique_queues=(unique_queues)
  @unique_queues = unique_queues
end

#unique_queues?Boolean

are unique queues turned on?

Returns:

  • (Boolean)


119
120
121
# File 'lib/resque/uniqueue.rb', line 119

def unique_queues?
  !!@unique_queues_enabled
end