Class: Autoscaler::Sidekiq::ThreadServer

Inherits:
Object
  • Object
show all
Defined in:
lib/autoscaler/sidekiq/thread_server.rb

Overview

Sidekiq server middleware spawns a thread to monitor the sidekiq server for scale-down

Instance Method Summary collapse

Constructor Details

#initialize(scaler, timeout, specified_queues = nil) ⇒ ThreadServer

Returns a new instance of ThreadServer.

Parameters:

  • scaler (scaler)

    object that actually performs scaling operations (e.g. HerokuPlatformScaler)

  • timeout (Strategy, Numeric)

    strategy object that determines target workers, or a timeout in seconds to be passed to DelayedShutdown+BinaryScalingStrategy

  • specified_queues (Array[String]) (defaults to: nil)

    list of queues to monitor to determine if there is work left. Defaults to all sidekiq queues.



14
15
16
17
18
19
20
# File 'lib/autoscaler/sidekiq/thread_server.rb', line 14

def initialize(scaler, timeout, specified_queues = nil)
  @scaler = scaler
  @strategy = strategy(timeout)
  @system = QueueSystem.new(specified_queues)
  @mutex = Mutex.new
  @done = false
end

Instance Method Details

#call(worker, msg, queue, _ = nil) ⇒ Object

Sidekiq middleware api entry point



23
24
25
26
27
28
# File 'lib/autoscaler/sidekiq/thread_server.rb', line 23

def call(worker, msg, queue, _ = nil)
  yield
ensure
  active_now!
  wait_for_downscale
end

#run(interval = 15) ⇒ Object

Thread core loop Periodically update the desired number of workers

Parameters:

  • interval (Numeric) (defaults to: 15)

    polling interval, mostly for testing



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/autoscaler/sidekiq/thread_server.rb', line 44

def run(interval = 15)
  active_now!

  workers = :unknown

  begin
    sleep(interval)
    target_workers = @strategy.call(@system, idle_time)
    workers = @scaler.workers = target_workers unless workers == target_workers
  end while !@done && workers > 0
  ::Sidekiq::ProcessSet.new.each(&:quiet!)
end

#terminateObject

Shut down the thread, pause until complete



58
59
60
61
62
63
64
65
# File 'lib/autoscaler/sidekiq/thread_server.rb', line 58

def terminate
  @done = true
  if @thread
    t = @thread
    @thread = nil
    t.value
  end
end

#wait_for_downscaleObject

Start the monitoring thread if it’s not running



31
32
33
34
35
36
37
38
39
# File 'lib/autoscaler/sidekiq/thread_server.rb', line 31

def wait_for_downscale
  @thread ||= Thread.new do
    begin
      run
    rescue
      @thread = nil
    end
  end
end