Class: Sidekiq::PriorityQueue::Fetch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/priority_queue/fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Fetch

Returns a new instance of Fetch.



38
39
40
41
42
# File 'lib/sidekiq/priority_queue/fetch.rb', line 38

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "priority-queue:#{q}" }
  @queues = @queues.uniq if @strictly_ordered_queues
end

Instance Method Details

#bulk_requeue(inprogress, options) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/sidekiq/priority_queue/fetch.rb', line 64

def bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue] ||= []
    jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
  end

  Sidekiq.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        conn.zadd(queue, jobs.map{|j| [0,j] })
      end
    end
  end
  Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

#queues_cmdObject



56
57
58
59
60
61
62
# File 'lib/sidekiq/priority_queue/fetch.rb', line 56

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    @queues.shuffle.uniq
  end
end

#retrieve_workObject



44
45
46
47
# File 'lib/sidekiq/priority_queue/fetch.rb', line 44

def retrieve_work
  work = @queues.detect{ |q| job = zpopmin(q); break [q,job] if job }
  UnitOfWork.new(*work) if work
end

#zpopmin(queue) ⇒ Object



49
50
51
52
53
54
# File 'lib/sidekiq/priority_queue/fetch.rb', line 49

def zpopmin(queue)
  Sidekiq.redis do |con|
    @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN)
    con.evalsha(@script_sha, [queue])
  end
end