Class: Sidekiq::HerokuAutoscale::QueueSystem
- Inherits:
-
Object
- Object
- Sidekiq::HerokuAutoscale::QueueSystem
- Defined in:
- lib/sidekiq/heroku_autoscale/queue_system.rb
Constant Summary collapse
- ALL_QUEUES =
'*'.freeze
Instance Attribute Summary collapse
-
#include_retrying ⇒ Object
Returns the value of attribute include_retrying.
-
#include_scheduled ⇒ Object
Returns the value of attribute include_scheduled.
-
#watch_queues ⇒ Object
Returns the value of attribute watch_queues.
Instance Method Summary collapse
- #all_queues? ⇒ Boolean
-
#dynos ⇒ Object
number of dynos (process instances) running sidekiq this may include one-or-more instances of one-or-more heroku process types (though they should all be one process type if setup validation was observed).
-
#enqueued ⇒ Object
number of jobs sitting in the active work queue.
- #has_work? ⇒ Boolean
-
#initialize(watch_queues: ALL_QUEUES, include_retrying: true, include_scheduled: true) ⇒ QueueSystem
constructor
A new instance of QueueSystem.
-
#quietdown!(scale) ⇒ Object
When scaling down workers, heroku stops the one with the highest number…
-
#retrying ⇒ Object
number of jobs in the retry set.
-
#scheduled ⇒ Object
number of jobs in the scheduled set.
- #sidekiq_processes ⇒ Object
- #sidekiq_queues ⇒ Object
-
#threads ⇒ Object
number of worker threads currently running sidekiq jobs counts all queue-specific threads across all dynos (process instances).
- #total_work ⇒ Object
Constructor Details
#initialize(watch_queues: ALL_QUEUES, include_retrying: true, include_scheduled: true) ⇒ QueueSystem
Returns a new instance of QueueSystem.
11 12 13 14 15 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 11 def initialize(watch_queues: ALL_QUEUES, include_retrying: true, include_scheduled: true) @watch_queues = [watch_queues].flatten.uniq @include_retrying = @include_scheduled = include_scheduled end |
Instance Attribute Details
#include_retrying ⇒ Object
Returns the value of attribute include_retrying.
9 10 11 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 9 def @include_retrying end |
#include_scheduled ⇒ Object
Returns the value of attribute include_scheduled.
9 10 11 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 9 def include_scheduled @include_scheduled end |
#watch_queues ⇒ Object
Returns the value of attribute watch_queues.
9 10 11 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 9 def watch_queues @watch_queues end |
Instance Method Details
#all_queues? ⇒ Boolean
17 18 19 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 17 def all_queues? @watch_queues.first == ALL_QUEUES end |
#dynos ⇒ Object
number of dynos (process instances) running sidekiq this may include one-or-more instances of one-or-more heroku process types (though they should all be one process type if setup validation was observed)
24 25 26 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 24 def dynos sidekiq_processes.size end |
#enqueued ⇒ Object
number of jobs sitting in the active work queue
38 39 40 41 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 38 def enqueued counts = all_queues? ? sidekiq_queues.values : sidekiq_queues.slice(*watch_queues).values counts.map(&:to_i).reduce(&:+) || 0 end |
#has_work? ⇒ Boolean
59 60 61 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 59 def has_work? total_work > 0 end |
#quietdown!(scale) ⇒ Object
When scaling down workers, heroku stops the one with the highest number… from stackoverflow.com/questions/25215334/scale-down-specific-heroku-worker-dynos
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 65 def quietdown!(scale) quieted = false # processes have hostnames formatted as "worker.1", "worker.2", "sidekiq.1", etc... # this groups processes by type, then sorts by number, and then quiets beyond scale. sidekiq_processes.group_by { |p| p['hostname'].split('.').first }.each_pair do |type, group| # there should only ever be a single group here (assuming setup validations were observed) group.sort_by { |p| p['hostname'].split('.').last.to_i }.each_with_index do |process, index| if index + 1 > scale && !process.stopping? process.quiet! quieted = true end end end quieted end |
#retrying ⇒ Object
number of jobs in the retry set
50 51 52 53 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 50 def return 0 unless @include_retrying count_jobs(::Sidekiq::RetrySet.new) end |
#scheduled ⇒ Object
number of jobs in the scheduled set
44 45 46 47 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 44 def scheduled return 0 unless @include_scheduled count_jobs(::Sidekiq::ScheduledSet.new) end |
#sidekiq_processes ⇒ Object
86 87 88 89 90 91 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 86 def sidekiq_processes process_set = ::Sidekiq::ProcessSet.new # select all processes with queues that intersect watched queues process_set = process_set.select { |p| (p['queues'] & @watch_queues).any? } unless all_queues? process_set end |
#sidekiq_queues ⇒ Object
82 83 84 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 82 def sidekiq_queues ::Sidekiq::Stats.new.queues end |
#threads ⇒ Object
number of worker threads currently running sidekiq jobs counts all queue-specific threads across all dynos (process instances)
30 31 32 33 34 35 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 30 def threads # work => { 'queue' => name, 'run_at' => timestamp, 'payload' => msg } worker_set = ::Sidekiq::Workers.new.to_a worker_set = worker_set.select { |pid, tid, work| watch_queues.include?(work['queue']) } unless all_queues? worker_set.length end |
#total_work ⇒ Object
55 56 57 |
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 55 def total_work enqueued + scheduled + + threads end |