Class: Sidekiq::BasicFetch
- Inherits:
-
Object
- Object
- Sidekiq::BasicFetch
- Defined in:
- lib/sidekiq/fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Class Method Summary collapse
-
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor.
Instance Method Summary collapse
-
#initialize(options) ⇒ BasicFetch
constructor
A new instance of BasicFetch.
-
#queues_cmd ⇒ Object
Creating the Redis#brpop command takes into account any configured queue weights.
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BasicFetch
Returns a new instance of BasicFetch.
95 96 97 98 99 |
# File 'lib/sidekiq/fetch.rb', line 95 def initialize() @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end |
Class Method Details
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/sidekiq/fetch.rb', line 108 def self.bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue_name] ||= [] jobs_to_requeue[unit_of_work.queue_name] << unit_of_work. end Sidekiq.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue, jobs| conn.rpush("queue:#{queue}", jobs) end end end Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.}") end |
Instance Method Details
#queues_cmd ⇒ Object
Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.
151 152 153 154 |
# File 'lib/sidekiq/fetch.rb', line 151 def queues_cmd queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq queues << Sidekiq::Fetcher::TIMEOUT end |
#retrieve_work ⇒ Object
101 102 103 104 |
# File 'lib/sidekiq/fetch.rb', line 101 def retrieve_work work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } UnitOfWork.new(*work) if work end |