Class: Sidekiq::PriorityQueue::ReliableFetch
- Inherits:
-
Object
- Object
- Sidekiq::PriorityQueue::ReliableFetch
- Includes:
- Util
- Defined in:
- lib/sidekiq/priority_queue/reliable_fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- SUPER_PROCESSES_REGISTRY_KEY =
'super_processes_priority'
Instance Method Summary collapse
-
#bulk_requeue(_inprogress, _options) ⇒ Object
Below method is called when we close sidekiq process gracefully.
-
#initialize(options) ⇒ ReliableFetch
constructor
A new instance of ReliableFetch.
- #queues_cmd ⇒ Object
- #retrieve_work ⇒ Object
- #setup ⇒ Object
- #spop(wip_queue) ⇒ Object
- #wip_queue(q) ⇒ Object
- #zpopmin_sadd(queue, wip_queue) ⇒ Object
Constructor Details
#initialize(options) ⇒ ReliableFetch
Returns a new instance of ReliableFetch.
42 43 44 45 46 47 48 49 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 42 def initialize() @options = @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "priority-queue:#{q}" } @queues = @queues.uniq if @strictly_ordered_queues @done = false @process_index = [:index] || ENV['PROCESS_INDEX'] end |
Instance Method Details
#bulk_requeue(_inprogress, _options) ⇒ Object
Below method is called when we close sidekiq process gracefully
99 100 101 102 103 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 99 def bulk_requeue(_inprogress, ) Sidekiq.logger.debug { 'Priority ReliableFetch: Re-queueing terminated jobs' } requeue_wip_jobs unregister_super_process end |
#queues_cmd ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 90 def queues_cmd if @strictly_ordered_queues @queues else @queues.shuffle.uniq end end |
#retrieve_work ⇒ Object
65 66 67 68 69 70 71 72 73 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 65 def retrieve_work return nil if @done work = @queues.detect do |q| job = zpopmin_sadd(q, wip_queue(q)) break [q, job] if job end UnitOfWork.new(*work, wip_queue(work.first)) if work end |
#setup ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 51 def setup Sidekiq.on(:startup) do cleanup_the_dead register_myself check end Sidekiq.on(:shutdown) do @done = true end Sidekiq.on(:heartbeat) do register_myself end end |
#spop(wip_queue) ⇒ Object
86 87 88 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 86 def spop(wip_queue) Sidekiq.redis { |con| con.spop(wip_queue) } end |
#wip_queue(q) ⇒ Object
75 76 77 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 75 def wip_queue(q) "queue:spriorityq|#{identity}|#{q}" end |
#zpopmin_sadd(queue, wip_queue) ⇒ Object
79 80 81 82 83 84 |
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 79 def zpopmin_sadd(queue, wip_queue) Sidekiq.redis do |conn| @script_sha ||= conn.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD) conn.evalsha(@script_sha, [queue, wip_queue]) end end |