Module: Sidekiq::LimitFetch::Global::Monitor

Extended by:
Monitor
Included in:
Monitor
Defined in:
lib/sidekiq/limit_fetch/global/monitor.rb

Constant Summary collapse

HEARTBEAT_PREFIX =
'limit:heartbeat:'
PROCESS_SET =
'limit:processes'
HEARTBEAT_TTL =
20
REFRESH_TIMEOUT =
5

Instance Method Summary collapse

Instance Method Details

#all_processesObject



28
29
30
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 28

def all_processes
  Sidekiq.redis { |it| it.smembers PROCESS_SET }
end

#handle_dynamic_queuesObject



44
45
46
47
48
49
50
51
52
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 44

def handle_dynamic_queues
  queues = Sidekiq::LimitFetch::Queues
  return unless queues.dynamic?

  available_queues = Sidekiq::Queue.all.map(&:name).reject do |it|
    queues.dynamic_exclude.include? it
  end
  queues.handle available_queues
end

#old_processesObject



32
33
34
35
36
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 32

def old_processes
  all_processes.reject do |process|
    Sidekiq.redis { |it| it.get heartbeat_key process } == '1'
  end
end

#remove_old_processes!Object



38
39
40
41
42
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 38

def remove_old_processes!
  Sidekiq.redis do |it|
    old_processes.each { |process| it.srem PROCESS_SET, [process] }
  end
end

#start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 14

def start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT)
  Thread.new do
    loop do
      Sidekiq::LimitFetch.redis_retryable do
        handle_dynamic_queues
        update_heartbeat ttl
        invalidate_old_processes
      end

      sleep timeout
    end
  end
end