Module: Sidekiq::LimitFetch::Global::Monitor

Extended by:
Monitor
Included in:
Monitor
Defined in:
lib/sidekiq/limit_fetch/global/monitor.rb

Constant Summary collapse

HEARTBEAT_PREFIX =
'limit:heartbeat:'
PROCESS_SET =
'limit:processes'
HEARTBEAT_TTL =
20
REFRESH_TIMEOUT =
5

Instance Method Summary collapse

Instance Method Details

#all_processesObject



24
25
26
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 24

def all_processes
  Sidekiq.redis {|it| it.smembers PROCESS_SET }
end

#handle_dynamic_queuesObject



40
41
42
43
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 40

def handle_dynamic_queues
  queues = Sidekiq::LimitFetch::Queues
  queues.handle Sidekiq::Queue.all.map(&:name) if queues.dynamic?
end

#old_processesObject



28
29
30
31
32
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 28

def old_processes
  all_processes.reject do |process|
    Sidekiq.redis {|it| it.get heartbeat_key process }
  end
end

#remove_old_processes!Object



34
35
36
37
38
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 34

def remove_old_processes!
  Sidekiq.redis do |it|
    old_processes.each {|process| it.srem PROCESS_SET, [process] }
  end
end

#start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object



10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 10

def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
  Thread.new do
    loop do
      Sidekiq::LimitFetch.redis_retryable do
        handle_dynamic_queues
        update_heartbeat ttl
        invalidate_old_processes
      end

      sleep timeout
    end
  end
end