Class: Sidekiq::PriorityQueue::ReliableFetch

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/sidekiq/priority_queue/reliable_fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

SUPER_PROCESSES_REGISTRY_KEY =
'super_processes_priority'

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ ReliableFetch

Returns a new instance of ReliableFetch.



42
43
44
45
46
47
48
49
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 42

def initialize(options)
  @options = options
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "priority-queue:#{q}" }
  @queues = @queues.uniq if @strictly_ordered_queues
  @done = false
  @process_index = options[:index] || ENV['PROCESS_INDEX']
end

Instance Method Details

#bulk_requeue(_inprogress, _options) ⇒ Object

Below method is called when we close sidekiq process gracefully



99
100
101
102
103
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 99

def bulk_requeue(_inprogress, _options)
  Sidekiq.logger.debug { 'Priority ReliableFetch: Re-queueing terminated jobs' }
  requeue_wip_jobs
  unregister_super_process
end

#queues_cmdObject



90
91
92
93
94
95
96
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 90

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    @queues.shuffle.uniq
  end
end

#retrieve_workObject



65
66
67
68
69
70
71
72
73
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 65

def retrieve_work
  return nil if @done

  work = @queues.detect do |q|
    job = zpopmin_sadd(q, wip_queue(q))
    break [q, job] if job
  end
  UnitOfWork.new(*work, wip_queue(work.first)) if work
end

#setupObject



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 51

def setup
  Sidekiq.on(:startup) do
    cleanup_the_dead
    register_myself
    check
  end
  Sidekiq.on(:shutdown) do
    @done = true
  end
  Sidekiq.on(:heartbeat) do
    register_myself
  end
end

#spop(wip_queue) ⇒ Object



86
87
88
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 86

def spop(wip_queue)
  Sidekiq.redis { |con| con.spop(wip_queue) }
end

#wip_queue(q) ⇒ Object



75
76
77
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 75

def wip_queue(q)
  "queue:spriorityq|#{identity}|#{q}"
end

#zpopmin_sadd(queue, wip_queue) ⇒ Object



79
80
81
82
83
84
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 79

def zpopmin_sadd(queue, wip_queue)
  Sidekiq.redis do |conn|
    @script_sha ||= conn.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD)
    conn.evalsha(@script_sha, [queue, wip_queue])
  end
end