Class: Sidekiq::BasicFetch
- Inherits:
-
Object
- Object
- Sidekiq::BasicFetch
- Defined in:
- lib/sidekiq/fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Class Method Summary collapse
-
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor.
Instance Method Summary collapse
-
#initialize(options) ⇒ BasicFetch
constructor
A new instance of BasicFetch.
-
#queues_cmd ⇒ Object
Creating the Redis#blpop command takes into account any configured queue weights.
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BasicFetch
Returns a new instance of BasicFetch.
82 83 84 85 86 |
# File 'lib/sidekiq/fetch.rb', line 82 def initialize() @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end |
Class Method Details
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/sidekiq/fetch.rb', line 95 def self.bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue_name] ||= [] jobs_to_requeue[unit_of_work.queue_name] << unit_of_work. end Sidekiq.redis do |conn| jobs_to_requeue.each do |queue, jobs| conn.rpush("queue:#{queue}", jobs) end end Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.}") end |
Instance Method Details
#queues_cmd ⇒ Object
Creating the Redis#blpop command takes into account any configured queue weights. By default Redis#blpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#blpop to honor weights and avoid queue starvation.
136 137 138 139 |
# File 'lib/sidekiq/fetch.rb', line 136 def queues_cmd queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq queues << Sidekiq::Fetcher::TIMEOUT end |
#retrieve_work ⇒ Object
88 89 90 91 |
# File 'lib/sidekiq/fetch.rb', line 88 def retrieve_work work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } UnitOfWork.new(*work) if work end |