Class: ResqueSolo::Queue
- Inherits:
-
Object
- Object
- ResqueSolo::Queue
- Defined in:
- lib/resque_solo/queue.rb
Class Method Summary collapse
- .cleanup(queue) ⇒ Object
- .destroy(queue, klass, *args) ⇒ Object
- .is_unique?(item) ⇒ Boolean
- .item_ttl(item) ⇒ Object
- .lock_after_execution_period(item) ⇒ Object
- .mark_queued(queue, item) ⇒ Object
- .mark_unqueued(queue, job) ⇒ Object
- .queued?(queue, item) ⇒ Boolean
- .unique_key(queue, item) ⇒ Object
Class Method Details
.cleanup(queue) ⇒ Object
64 65 66 67 68 69 70 71 |
# File 'lib/resque_solo/queue.rb', line 64 def cleanup(queue) cursor = "0" loop do cursor, keys = redis.scan(cursor, match: "solo:queue:#{queue}:job:*") redis.del(*keys) if keys.any? break if cursor.to_i.zero? end end |
.destroy(queue, klass, *args) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/resque_solo/queue.rb', line 52 def destroy(queue, klass, *args) klass = klass.to_s redis_queue = "queue:#{queue}" redis.lrange(redis_queue, 0, -1).each do |string| json = Resque.decode(string) next unless json["class"] == klass next if args.any? && json["args"] != args ResqueSolo::Queue.mark_unqueued(queue, json) end end |
.is_unique?(item) ⇒ Boolean
34 35 36 37 38 |
# File 'lib/resque_solo/queue.rb', line 34 def is_unique?(item) const_for(item).included_modules.include?(::Resque::Plugins::UniqueJob) rescue NameError false end |
.item_ttl(item) ⇒ Object
40 41 42 43 44 |
# File 'lib/resque_solo/queue.rb', line 40 def item_ttl(item) const_for(item).ttl rescue NameError -1 end |
.lock_after_execution_period(item) ⇒ Object
46 47 48 49 50 |
# File 'lib/resque_solo/queue.rb', line 46 def lock_after_execution_period(item) const_for(item).lock_after_execution_period rescue NameError 0 end |
.mark_queued(queue, item) ⇒ Object
11 12 13 14 15 16 17 |
# File 'lib/resque_solo/queue.rb', line 11 def mark_queued(queue, item) return unless is_unique?(item) key = unique_key(queue, item) redis.set(key, 1) ttl = item_ttl(item) redis.expire(key, ttl) if ttl >= 0 end |
.mark_unqueued(queue, job) ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/resque_solo/queue.rb', line 19 def mark_unqueued(queue, job) item = job.is_a?(Resque::Job) ? job.payload : job return unless is_unique?(item) ttl = lock_after_execution_period(item) if ttl == 0 redis.del(unique_key(queue, item)) else redis.expire(unique_key(queue, item), ttl) end end |
.queued?(queue, item) ⇒ Boolean
6 7 8 9 |
# File 'lib/resque_solo/queue.rb', line 6 def queued?(queue, item) return false unless is_unique?(item) redis.get(unique_key(queue, item)) == "1" end |
.unique_key(queue, item) ⇒ Object
30 31 32 |
# File 'lib/resque_solo/queue.rb', line 30 def unique_key(queue, item) "solo:queue:#{queue}:job:#{const_for(item).redis_key(item)}" end |