Module: Sidekiq::LimitFetch::Global::Monitor

Extended by:
Monitor
Includes:
Redis
Included in:
Monitor
Defined in:
lib/sidekiq/limit_fetch/global/monitor.rb

Constant Summary collapse

HEARTBEAT_PREFIX =
'limit:heartbeat:'
PROCESS_SET =
'limit:processes'
HEARTBEAT_TTL =
20
REFRESH_TIMEOUT =
5

Instance Method Summary collapse

Methods included from Redis

#determine_namespace, #nonblocking_redis, #redis

Instance Method Details

#add_dynamic(queues) ⇒ Object



38
39
40
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 38

def add_dynamic(queues)
  queues.add Sidekiq::Queue.all.map(&:name)
end

#all_processesObject



22
23
24
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 22

def all_processes
  redis {|it| it.smembers PROCESS_SET }
end

#old_processesObject



26
27
28
29
30
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 26

def old_processes
  all_processes.reject do |process|
    redis {|it| it.get heartbeat_key process }
  end
end

#remove_old_processes!Object



32
33
34
35
36
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 32

def remove_old_processes!
  redis do |it|
    old_processes.each {|process| it.srem PROCESS_SET, process }
  end
end

#start!(queues, ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object



11
12
13
14
15
16
17
18
19
20
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 11

def start!(queues, ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
  Thread.new do
    loop do
      add_dynamic queues if queues.dynamic?
      update_heartbeat ttl
      invalidate_old_processes
      sleep timeout
    end
  end
end