Class: Sidekiq::BasicFetch
- Inherits:
-
Object
- Object
- Sidekiq::BasicFetch
- Defined in:
- lib/sidekiq/fetch.rb
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- TIMEOUT =
We want the fetch operation to timeout every few seconds so the thread can check if the process is shutting down.
2
Class Method Summary collapse
-
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor.
Instance Method Summary collapse
-
#initialize(options) ⇒ BasicFetch
constructor
A new instance of BasicFetch.
-
#queues_cmd ⇒ Object
Creating the Redis#brpop command takes into account any configured queue weights.
- #retrieve_work ⇒ Object
Constructor Details
#initialize(options) ⇒ BasicFetch
Returns a new instance of BasicFetch.
27 28 29 30 31 32 33 34 |
# File 'lib/sidekiq/fetch.rb', line 27 def initialize() @strictly_ordered_queues = !![:strict] @queues = [:queues].map { |q| "queue:#{q}" } if @strictly_ordered_queues @queues.uniq! @queues << TIMEOUT end end |
Class Method Details
.bulk_requeue(inprogress, options) ⇒ Object
By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/sidekiq/fetch.rb', line 58 def self.bulk_requeue(inprogress, ) return if inprogress.empty? Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue] ||= [] jobs_to_requeue[unit_of_work.queue] << unit_of_work.job end Sidekiq.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue, jobs| conn.rpush(queue, jobs) end end end Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.}") end |
Instance Method Details
#queues_cmd ⇒ Object
Creating the Redis#brpop command takes into account any configured queue weights. By default Redis#brpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#brpop to honor weights and avoid queue starvation.
46 47 48 49 50 51 52 53 54 |
# File 'lib/sidekiq/fetch.rb', line 46 def queues_cmd if @strictly_ordered_queues @queues else queues = @queues.shuffle!.uniq queues << TIMEOUT queues end end |
#retrieve_work ⇒ Object
36 37 38 39 |
# File 'lib/sidekiq/fetch.rb', line 36 def retrieve_work work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) } UnitOfWork.new(*work) if work end |