Class: Sidekiq::PrioritizedQueues::Fetch
- Inherits:
-
Object
- Object
- Sidekiq::PrioritizedQueues::Fetch
- Defined in:
- lib/sidekiq/prioritized_queues/fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Class Method Summary collapse
-
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor.
Instance Method Summary collapse
-
#initialize(options) ⇒ Fetch
constructor
A new instance of Fetch.
- #queues ⇒ Object
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ Fetch
Returns a new instance of Fetch.
4 5 6 7 8 |
# File 'lib/sidekiq/prioritized_queues/fetch.rb', line 4 def initialize() @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end |
Class Method Details
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/sidekiq/prioritized_queues/fetch.rb', line 32 def self.bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue_name] ||= [] jobs_to_requeue[unit_of_work.queue_name] << unit_of_work. end Sidekiq.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue, jobs| jobs.each { |job| conn.zadd("queue:#{queue}", 0, job) } end end end Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.}") end |
Instance Method Details
#queues ⇒ Object
68 69 70 |
# File 'lib/sidekiq/prioritized_queues/fetch.rb', line 68 def queues @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq end |
#retrieve_work ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/sidekiq/prioritized_queues/fetch.rb', line 10 def retrieve_work work = nil Sidekiq.redis do |conn| queues.find do |queue| response = conn.multi do conn.zrange(queue, 0, 0) conn.zremrangebyrank(queue, 0, 0) end.flatten(1) next if response.length == 1 work = [queue, response.first] break end end return UnitOfWork.new(*work) if work sleep 1; nil end |