Class: Sidekiq::HerokuAutoscale::QueueSystem

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/heroku_autoscale/queue_system.rb

Constant Summary collapse

ALL_QUEUES =
'*'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(watch_queues: ALL_QUEUES, include_retrying: true, include_scheduled: true) ⇒ QueueSystem

Returns a new instance of QueueSystem.



11
12
13
14
15
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 11

def initialize(watch_queues: ALL_QUEUES, include_retrying: true, include_scheduled: true)
  @watch_queues = [watch_queues].flatten.uniq
  @include_retrying = include_retrying
  @include_scheduled = include_scheduled
end

Instance Attribute Details

#include_retryingObject

Returns the value of attribute include_retrying.



9
10
11
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 9

def include_retrying
  @include_retrying
end

#include_scheduledObject

Returns the value of attribute include_scheduled.



9
10
11
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 9

def include_scheduled
  @include_scheduled
end

#watch_queuesObject

Returns the value of attribute watch_queues.



9
10
11
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 9

def watch_queues
  @watch_queues
end

Instance Method Details

#all_queues?Boolean

Returns:

  • (Boolean)


17
18
19
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 17

def all_queues?
  @watch_queues.first == ALL_QUEUES
end

#dynosObject

number of dynos (process instances) running sidekiq this may include one-or-more instances of one-or-more heroku process types (though they should all be one process type if setup validation was observed)



24
25
26
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 24

def dynos
  sidekiq_processes.size
end

#enqueuedObject

number of jobs sitting in the active work queue



38
39
40
41
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 38

def enqueued
  counts = all_queues? ? sidekiq_queues.values : sidekiq_queues.slice(*watch_queues).values
  counts.map(&:to_i).reduce(&:+) || 0
end

#has_work?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 59

def has_work?
  total_work > 0
end

#quietdown!(scale) ⇒ Object

When scaling down workers, heroku stops the one with the highest number… from stackoverflow.com/questions/25215334/scale-down-specific-heroku-worker-dynos



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 65

def quietdown!(scale)
  quieted = false
  # processes have hostnames formatted as "worker.1", "worker.2", "sidekiq.1", etc...
  # this groups processes by type, then sorts by number, and then quiets beyond scale.
  sidekiq_processes.group_by { |p| p['hostname'].split('.').first }.each_pair do |type, group|
    # there should only ever be a single group here (assuming setup validations were observed)
    group.sort_by { |p| p['hostname'].split('.').last.to_i }.each_with_index do |process, index|
      if index + 1 > scale && !process.stopping?
        process.quiet!
        quieted = true
      end
    end
  end

  quieted
end

#retryingObject

number of jobs in the retry set



50
51
52
53
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 50

def retrying
  return 0 unless @include_retrying
  count_jobs(::Sidekiq::RetrySet.new)
end

#scheduledObject

number of jobs in the scheduled set



44
45
46
47
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 44

def scheduled
  return 0 unless @include_scheduled
  count_jobs(::Sidekiq::ScheduledSet.new)
end

#sidekiq_processesObject



86
87
88
89
90
91
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 86

def sidekiq_processes
  process_set = ::Sidekiq::ProcessSet.new
  # select all processes with queues that intersect watched queues
  process_set = process_set.select { |p| (p['queues'] & @watch_queues).any? } unless all_queues?
  process_set
end

#sidekiq_queuesObject



82
83
84
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 82

def sidekiq_queues
  ::Sidekiq::Stats.new.queues
end

#threadsObject

number of worker threads currently running sidekiq jobs counts all queue-specific threads across all dynos (process instances)



30
31
32
33
34
35
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 30

def threads
  # work => { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
  worker_set = ::Sidekiq::Workers.new.to_a
  worker_set = worker_set.select { |pid, tid, work| watch_queues.include?(work['queue']) } unless all_queues?
  worker_set.length
end

#total_workObject



55
56
57
# File 'lib/sidekiq/heroku_autoscale/queue_system.rb', line 55

def total_work
  enqueued + scheduled + retrying + threads
end