Class: Sidekiq::WorkSet
- Inherits:
-
Object
- Object
- Sidekiq::WorkSet
- Includes:
- Enumerable
- Defined in:
- lib/sidekiq/api.rb
Overview
The WorkSet stores the work being done by this Sidekiq cluster. It tracks the process and thread working on each job.
WARNING WARNING WARNING
This is live data that can change every millisecond. If you call #size => 5 and then expect #each to be called 5 times, you’re going to have a bad time.
works = Sidekiq::WorkSet.new
works.size => 2
works.each do |process_id, thread_id, work|
# process_id is a unique identifier per Sidekiq process
# thread_id is a unique identifier per thread
# work is a `Sidekiq::Work` instance that has the following accessor methods.
# [work.queue, work.run_at, work.payload]
end
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#find_work(jid) ⇒ Sidekiq::Work
(also: #find_work_by_jid)
Find the work which represents a job with the given JID.
-
#size ⇒ Object
Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds.
Instance Method Details
#each(&block) ⇒ Object
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 |
# File 'lib/sidekiq/api.rb', line 1199 def each(&block) results = [] procs = nil all_works = nil Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a.sort all_works = conn.pipelined do |pipeline| procs.each do |key| pipeline.hgetall("#{key}:work") end end end procs.zip(all_works).each do |key, workers| workers.each_pair do |tid, json| results << [key, tid, Sidekiq::Work.new(key, tid, Sidekiq.load_json(json))] unless json.empty? end end results.sort_by { |(_, _, work)| work.run_at }.each(&block) end |
#find_work(jid) ⇒ Sidekiq::Work Also known as: find_work_by_jid
Find the work which represents a job with the given JID. *This is a slow O(n) operation*. Do not use for app logic.
1249 1250 1251 1252 1253 1254 1255 |
# File 'lib/sidekiq/api.rb', line 1249 def find_work(jid) each do |_process_id, _thread_id, work| job = work.job return work if job.jid == jid end nil end |
#size ⇒ Object
Note that #size is only as accurate as Sidekiq’s heartbeat, which happens every 5 seconds. It is NOT real-time.
Not very efficient if you have lots of Sidekiq processes but the alternative is a global counter which can easily get out of sync with crashy processes.
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 |
# File 'lib/sidekiq/api.rb', line 1228 def size Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a if procs.empty? 0 else conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "busy") end }.sum(&:to_i) end end end |