Class: ResqueSolo::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/resque_solo/queue.rb

Class Method Summary collapse

Class Method Details

.cleanup(queue) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/resque_solo/queue.rb', line 64

def cleanup(queue)
  cursor = "0"
  loop do
    cursor, keys = redis.scan(cursor, match: "solo:queue:#{queue}:job:*")
    redis.del(*keys) if keys.any?
    break if cursor.to_i.zero?
  end
end

.destroy(queue, klass, *args) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/resque_solo/queue.rb', line 52

def destroy(queue, klass, *args)
  klass = klass.to_s
  redis_queue = "queue:#{queue}"

  redis.lrange(redis_queue, 0, -1).each do |string|
    json = Resque.decode(string)
    next unless json["class"] == klass
    next if args.any? && json["args"] != args
    ResqueSolo::Queue.mark_unqueued(queue, json)
  end
end

.is_unique?(item) ⇒ Boolean

Returns:

  • (Boolean)


34
35
36
37
38
# File 'lib/resque_solo/queue.rb', line 34

def is_unique?(item)
  const_for(item).included_modules.include?(::Resque::Plugins::UniqueJob)
rescue NameError
  false
end

.item_ttl(item) ⇒ Object



40
41
42
43
44
# File 'lib/resque_solo/queue.rb', line 40

def item_ttl(item)
  const_for(item).ttl
rescue NameError
  -1
end

.lock_after_execution_period(item) ⇒ Object



46
47
48
49
50
# File 'lib/resque_solo/queue.rb', line 46

def lock_after_execution_period(item)
  const_for(item).lock_after_execution_period
rescue NameError
  0
end

.mark_queued(queue, item) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/resque_solo/queue.rb', line 11

def mark_queued(queue, item)
  return unless is_unique?(item)
  key = unique_key(queue, item)
  redis.set(key, 1)
  ttl = item_ttl(item)
  redis.expire(key, ttl) if ttl >= 0
end

.mark_unqueued(queue, job) ⇒ Object



19
20
21
22
23
24
25
26
27
28
# File 'lib/resque_solo/queue.rb', line 19

def mark_unqueued(queue, job)
  item = job.is_a?(Resque::Job) ? job.payload : job
  return unless is_unique?(item)
  ttl = lock_after_execution_period(item)
  if ttl == 0
    redis.del(unique_key(queue, item))
  else
    redis.expire(unique_key(queue, item), ttl)
  end
end

.queued?(queue, item) ⇒ Boolean

Returns:

  • (Boolean)


6
7
8
9
# File 'lib/resque_solo/queue.rb', line 6

def queued?(queue, item)
  return false unless is_unique?(item)
  redis.get(unique_key(queue, item)) == "1"
end

.unique_key(queue, item) ⇒ Object



30
31
32
# File 'lib/resque_solo/queue.rb', line 30

def unique_key(queue, item)
  "solo:queue:#{queue}:job:#{const_for(item).redis_key(item)}"
end