Class: Sidekiq::PriorityQueue::ReliableFetch::UnitOfWork

Inherits:
Struct
  • Object
show all
Defined in:
lib/sidekiq/priority_queue/reliable_fetch.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#jobObject

Returns the value of attribute job

Returns:

  • (Object)

    the current value of job



13
14
15
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 13

def job
  @job
end

#queueObject

Returns the value of attribute queue

Returns:

  • (Object)

    the current value of queue



13
14
15
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 13

def queue
  @queue
end

#wip_queueObject

Returns the value of attribute wip_queue

Returns:

  • (Object)

    the current value of wip_queue



13
14
15
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 13

def wip_queue
  @wip_queue
end

Instance Method Details

#acknowledgeObject



14
15
16
17
18
19
20
21
22
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 14

def acknowledge
  Sidekiq.redis do |conn|
    conn.srem(wip_queue, job)
    unless subqueue.nil?
      count = conn.zincrby(subqueue_counts, -1, subqueue)
      conn.zrem(subqueue_counts, subqueue) if count < 1
    end
  end
end

#queue_nameObject



24
25
26
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 24

def queue_name
  queue.sub(/.*queue:/, '')
end

#requeueObject



37
38
39
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 37

def requeue
  # Nothing needed. Jobs are in WIP queue.
end

#subqueueObject



28
29
30
31
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 28

def subqueue
  @parsed_job ||= JSON.parse(job)
  @parsed_job['subqueue']
end

#subqueue_countsObject



33
34
35
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 33

def subqueue_counts
  "priority-queue-counts:#{queue_name}"
end