Module: Sidekiq::LimitFetch::Global::Monitor
Constant Summary collapse
- HEARTBEAT_PREFIX =
'limit:heartbeat:'
- PROCESS_SET =
'limit:processes'
- HEARTBEAT_TTL =
20
- REFRESH_TIMEOUT =
5
Instance Method Summary collapse
- #all_processes ⇒ Object
- #handle_dynamic_queues ⇒ Object
- #old_processes ⇒ Object
- #remove_old_processes! ⇒ Object
- #start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object
Instance Method Details
#all_processes ⇒ Object
28 29 30 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 28 def all_processes Sidekiq.redis { |it| it.smembers PROCESS_SET } end |
#handle_dynamic_queues ⇒ Object
44 45 46 47 48 49 50 51 52 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 44 def handle_dynamic_queues queues = Sidekiq::LimitFetch::Queues return unless queues.dynamic? available_queues = Sidekiq::Queue.all.map(&:name).reject do |it| queues.dynamic_exclude.include? it end queues.handle available_queues end |
#old_processes ⇒ Object
32 33 34 35 36 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 32 def old_processes all_processes.reject do |process| Sidekiq.redis { |it| it.get heartbeat_key process } == '1' end end |
#remove_old_processes! ⇒ Object
38 39 40 41 42 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 38 def remove_old_processes! Sidekiq.redis do |it| old_processes.each { |process| it.srem PROCESS_SET, [process] } end end |
#start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/sidekiq/limit_fetch/global/monitor.rb', line 14 def start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) Thread.new do loop do Sidekiq::LimitFetch.redis_retryable do handle_dynamic_queues update_heartbeat ttl invalidate_old_processes end sleep timeout end end end |