Module: Resque::Uniqueue
- Defined in:
- lib/resque/uniqueue.rb
Instance Method Summary collapse
- #confirm_compatible_redis_version ⇒ Object
-
#confirm_unique_queue_validity(queue) ⇒ Object
if the queue and set sizes differ, something is very wrong and we should fail loudly.
- #load_script(script) ⇒ Object
- #pop(queue) ⇒ Object
- #pop_unique(queue) ⇒ Object
- #pop_unique_eval_sha ⇒ Object
- #push(queue, item) ⇒ Object
- #push_unique(queue, item, time = Time.now.utc.to_i) ⇒ Object
- #push_unique_eval_sha ⇒ Object
- #queue_and_set_length_equal_eval_sha ⇒ Object
- #remove_queue(queue) ⇒ Object
-
#unique_queue?(queue) ⇒ Boolean
is this queue a unique queue if you have uniqueue turned on and no queues are set, its assumes all queues are unique.
-
#unique_queues ⇒ Object
list the unique queues.
-
#unique_queues! ⇒ Object
turn on unique queues.
-
#unique_queues=(unique_queues) ⇒ Object
set a specific list of unique queues.
-
#unique_queues? ⇒ Boolean
are unique queues turned on?.
Instance Method Details
#confirm_compatible_redis_version ⇒ Object
123 124 125 126 127 128 129 130 |
# File 'lib/resque/uniqueue.rb', line 123 def confirm_compatible_redis_version redis_version = redis.info["redis_version"] major, minor, patch = redis_version.split(".").map(&:to_i) if major < 2 || (major == 2 && minor < 6) #TODO raise specific exception raise "Redis version must be at least 2.6.0 you are running #{redis_version}" end end |
#confirm_unique_queue_validity(queue) ⇒ Object
if the queue and set sizes differ, something is very wrong and we should fail loudly
88 89 90 91 92 93 |
# File 'lib/resque/uniqueue.rb', line 88 def confirm_unique_queue_validity(queue) response = redis.evalsha queue_and_set_length_equal_eval_sha, [queue] return true if response == 1 #TODO raise specific exception raise "Make sure your queues are empty before you start using uniqueue" end |
#load_script(script) ⇒ Object
83 84 85 |
# File 'lib/resque/uniqueue.rb', line 83 def load_script(script) redis.script :load, script end |
#pop(queue) ⇒ Object
10 11 12 |
# File 'lib/resque/uniqueue.rb', line 10 def pop(queue) unique_queue?(queue) ? pop_unique(queue) : super end |
#pop_unique(queue) ⇒ Object
27 28 29 30 31 32 33 34 35 |
# File 'lib/resque/uniqueue.rb', line 27 def pop_unique(queue) queue = "queue:#{queue}" confirm_unique_queue_validity(queue) results = redis.evalsha pop_unique_eval_sha, [queue] return nil unless results[0] job = decode results[0] job["start_at"] ||= results[1].to_i return job end |
#pop_unique_eval_sha ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/resque/uniqueue.rb', line 51 def pop_unique_eval_sha @pop_unique_eval_sha ||= load_script <<-LUA local queue_name = KEYS[1] local uniqueue_name = queue_name..':uniqueue' local start_at_name = queue_name..':start_at' local results = {} results[1] = redis.call('lpop', queue_name) results[2] = redis.call('lpop', start_at_name) if results[1] then redis.call('srem', uniqueue_name, results[1]) end return results LUA end |
#push(queue, item) ⇒ Object
6 7 8 |
# File 'lib/resque/uniqueue.rb', line 6 def push(queue, item) unique_queue?(queue) ? push_unique(queue, item) : super end |
#push_unique(queue, item, time = Time.now.utc.to_i) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/resque/uniqueue.rb', line 20 def push_unique(queue, item, time = Time.now.utc.to_i) watch_queue(queue) queue = "queue:#{queue}" redis.evalsha push_unique_eval_sha, [queue], [encode(item), time] end |
#push_unique_eval_sha ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/resque/uniqueue.rb', line 37 def push_unique_eval_sha @push_unique_eval_sha ||= load_script <<-LUA local queue_name = KEYS[1] local uniqueue_name = queue_name..':uniqueue' local start_at_name = queue_name..':start_at' local not_in_set = redis.call('sadd', uniqueue_name , ARGV[1]) if not_in_set == 1 then redis.call('rpush', start_at_name, ARGV[2]) return redis.call('rpush', queue_name, ARGV[1]) end return false LUA end |
#queue_and_set_length_equal_eval_sha ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/resque/uniqueue.rb', line 66 def queue_and_set_length_equal_eval_sha @queue_and_set_length_equal_eval_sha ||= load_script <<-LUA local queue_name = KEYS[1] local uniqueue_name = queue_name..':uniqueue' local start_at_name = queue_name..':start_at' local queue_size = redis.call('llen', queue_name) local uniqueue_size = redis.call('scard', uniqueue_name) local start_at_size = redis.call('llen', start_at_name) if queue_size == uniqueue_size then if queue_size == start_at_size then return true end end return false LUA end |
#remove_queue(queue) ⇒ Object
14 15 16 17 18 |
# File 'lib/resque/uniqueue.rb', line 14 def remove_queue(queue) super(queue) redis.del("queue:#{queue}:uniqueue") redis.del("queue:#{queue}:start_at") end |
#unique_queue?(queue) ⇒ Boolean
is this queue a unique queue if you have uniqueue turned on and no queues are set, its assumes all queues are unique
97 98 99 100 |
# File 'lib/resque/uniqueue.rb', line 97 def unique_queue?(queue) return false unless unique_queues? !unique_queues || unique_queues.include?(queue) end |
#unique_queues ⇒ Object
list the unique queues
103 104 105 |
# File 'lib/resque/uniqueue.rb', line 103 def unique_queues @unique_queues end |
#unique_queues! ⇒ Object
turn on unique queues
113 114 115 116 |
# File 'lib/resque/uniqueue.rb', line 113 def unique_queues! confirm_compatible_redis_version @unique_queues_enabled = true end |
#unique_queues=(unique_queues) ⇒ Object
set a specific list of unique queues
108 109 110 |
# File 'lib/resque/uniqueue.rb', line 108 def unique_queues=(unique_queues) @unique_queues = unique_queues end |
#unique_queues? ⇒ Boolean
are unique queues turned on?
119 120 121 |
# File 'lib/resque/uniqueue.rb', line 119 def unique_queues? !!@unique_queues_enabled end |