Class: Karafka::Connection::Conductor

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/conductor.rb

Overview

Conductor is responsible for time orchestration of listeners manager. It blocks when manager is not needed as there were no state changes that could cause any listeners config changes and unblocks when things change or when certain time passed. The time based unblocking allows for building of complex managers that could be state aware

Instance Method Summary collapse

Constructor Details

#initialize(max_interval = 30_000) ⇒ Conductor

Returns a new instance of Conductor.

Parameters:

  • max_interval (Integer) (defaults to: 30_000)

    after how many milliseconds of doing nothing should we wake up the manager despite no state changes



12
13
14
15
# File 'lib/karafka/connection/conductor.rb', line 12

def initialize(max_interval = 30_000)
  @lock = RUBY_VERSION < '3.2' ? Processing::TimedQueue.new : Queue.new
  @timeout = max_interval / 1_000.0
end

Instance Method Details

#signalObject

Releases wait lock on state change



23
24
25
# File 'lib/karafka/connection/conductor.rb', line 23

def signal
  @lock << true
end

#waitObject

Waits in a blocking way until it is time to manage listeners



18
19
20
# File 'lib/karafka/connection/conductor.rb', line 18

def wait
  @lock.pop(timeout: @timeout)
end