Class: Sidekiq::ProcessSet
- Inherits:
-
Object
- Object
- Sidekiq::ProcessSet
- Includes:
- Enumerable
- Defined in:
- lib/sidekiq/api.rb
Overview
Enumerates the set of Sidekiq processes which are actively working right now. Each process send a heartbeat to Redis every 5 seconds so this set should be relatively accurate, barring network partitions.
Yields a hash of data which looks something like this:
'hostname' => 'app-1.example.com',
'started_at' => <process start time>,
'pid' => 12345,
'tag' => 'myapp'
'concurrency' => 25,
'queues' => ['default', 'low'],
'busy' => 10,
'beat' => <last heartbeat>,
Instance Method Summary collapse
- #each(&block) ⇒ Object
-
#size ⇒ Object
This method is not guaranteed accurate since it does not prune the set based on current heartbeat.
Instance Method Details
#each(&block) ⇒ Object
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/sidekiq/api.rb', line 468 def each(&block) procs = Sidekiq.redis { |conn| conn.smembers('processes') } to_prune = [] sorted = procs.sort Sidekiq.redis do |conn| # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way result = conn.pipelined do sorted.each do |key| conn.hmget(key, 'info', 'busy', 'beat') end end result.each_with_index do |(info, busy, at_s), i| # the hash named key has an expiry of 60 seconds. # if it's not found, that means the process has not reported # in to Redis and probably died. (to_prune << sorted[i]; next) if info.nil? hash = Sidekiq.load_json(info) yield hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f) end end Sidekiq.redis {|conn| conn.srem('processes', to_prune) } unless to_prune.empty? nil end |
#size ⇒ Object
This method is not guaranteed accurate since it does not prune the set based on current heartbeat. #each does that and ensures the set only contains Sidekiq processes which have sent a heartbeat within the last 60 seconds.
501 502 503 |
# File 'lib/sidekiq/api.rb', line 501 def size Sidekiq.redis { |conn| conn.scard('processes') } end |