Class: Sidekiq::ProcessSet

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/sidekiq/api.rb

Overview

Enumerates the set of Sidekiq processes which are actively working right now. Each process send a heartbeat to Redis every 5 seconds so this set should be relatively accurate, barring network partitions.

Yields a Sidekiq::Process.

Instance Method Summary collapse

Instance Method Details

#each(&block) ⇒ Object



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
# File 'lib/sidekiq/api.rb', line 524

def each(&block)
  procs = Sidekiq.redis { |conn| conn.smembers('processes') }

  to_prune = []
  sorted = procs.sort
  Sidekiq.redis do |conn|
    # We're making a tradeoff here between consuming more memory instead of
    # making more roundtrips to Redis, but if you have hundreds or thousands of workers,
    # you'll be happier this way
    result = conn.pipelined do
      sorted.each do |key|
        conn.hmget(key, 'info', 'busy', 'beat')
      end
    end

    result.each_with_index do |(info, busy, at_s), i|
      # the hash named key has an expiry of 60 seconds.
      # if it's not found, that means the process has not reported
      # in to Redis and probably died.
      (to_prune << sorted[i]; next) if info.nil?
      hash = Sidekiq.load_json(info)
      yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
    end
  end

  Sidekiq.redis {|conn| conn.srem('processes', to_prune) } unless to_prune.empty?
  nil
end

#sizeObject

This method is not guaranteed accurate since it does not prune the set based on current heartbeat. #each does that and ensures the set only contains Sidekiq processes which have sent a heartbeat within the last 60 seconds.



557
558
559
# File 'lib/sidekiq/api.rb', line 557

def size
  Sidekiq.redis { |conn| conn.scard('processes') }
end