Class: Sidekiq::BasicFetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BasicFetch

Returns a new instance of BasicFetch.



82
83
84
85
86
# File 'lib/sidekiq/fetch.rb', line 82

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "queue:#{q}" }
  @unique_queues = @queues.uniq
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object

By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it an instance method will make it async to the Fetcher actor



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/sidekiq/fetch.rb', line 95

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_name] ||= []
    jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
  end

  Sidekiq.redis do |conn|
    jobs_to_requeue.each do |queue, jobs|
      conn.rpush("queue:#{queue}", jobs)
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

Instance Method Details

#queues_cmdObject

Creating the Redis#blpop command takes into account any configured queue weights. By default Redis#blpop returns data from the first queue that has pending elements. We recreate the queue command each time we invoke Redis#blpop to honor weights and avoid queue starvation.



136
137
138
139
# File 'lib/sidekiq/fetch.rb', line 136

def queues_cmd
  queues = @strictly_ordered_queues ? @unique_queues.dup : @queues.shuffle.uniq
  queues << Sidekiq::Fetcher::TIMEOUT
end

#retrieve_workObject



88
89
90
91
# File 'lib/sidekiq/fetch.rb', line 88

def retrieve_work
  work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
  UnitOfWork.new(*work) if work
end