Class: Aikido::Zen::Worker Private

Inherits:
Object
  • Object
show all
Defined in:
lib/aikido/zen/worker.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

The worker manages the background thread in which Zen communicates with the Aikido server.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: Aikido::Zen.config) ⇒ Worker

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Worker.



17
18
19
20
21
22
# File 'lib/aikido/zen/worker.rb', line 17

def initialize(config: Aikido::Zen.config)
  @config = config
  @timers = []
  @deferrals = []
  @executor = Concurrent::SingleThreadExecutor.new
end

Instance Attribute Details

#executorConcurrent::ExecutorService (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Concurrent::ExecutorService)


12
13
14
# File 'lib/aikido/zen/worker.rb', line 12

def executor
  @executor
end

Instance Method Details

#delay(interval, &task) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Queue a block to be run asynchronously after a delay.

Parameters:

  • interval (Integer)

    amount of seconds to wait.



39
40
41
42
43
# File 'lib/aikido/zen/worker.rb', line 39

def delay(interval, &task)
  Concurrent::ScheduledTask
    .execute(interval, executor: executor) { perform(&task) }
    .tap { |deferral| @deferrals << deferral }
end

#every(interval, run_now: true, &task) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Queue a block to run repeatedly on a timer on the background thread. The timer will consider how long the block takes to run to schedule the next run. For example, if you schedule a block to run every 10 seconds, and the block itself takes 2 seconds, the second iteration will be run 8 seconds after the first one.

If the block takes longer than the given interval, the second iteration will be run immediately.

Parameters:

  • interval (Integer)

    amount of seconds to wait between runs.

  • run_now (Boolean) (defaults to: true)

    whether to run the block immediately, or wait for interval seconds before the first run. Defaults to true.



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/aikido/zen/worker.rb', line 58

def every(interval, run_now: true, &task)
  Concurrent::TimerTask
    .execute(
      run_now: run_now,
      executor: executor,
      interval_type: :fixed_rate,
      execution_interval: interval
    ) {
      perform(&task)
    }
    .tap { |timer| @timers << timer }
end

#perform(&block) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Queue a block to be run asynchronously in the background thread.



27
28
29
30
31
32
33
# File 'lib/aikido/zen/worker.rb', line 27

def perform(&block)
  executor.post do
    yield
  rescue Exception => err # rubocop:disable Lint/RescueException
    @config.logger.error "Error in background worker: #{err.inspect}"
  end
end

#shutdownvoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Safely clean up and kill the thread, giving time to kill any ongoing tasks on the queue.



75
76
77
78
79
80
# File 'lib/aikido/zen/worker.rb', line 75

def shutdown
  @deferrals.each { |task| task.cancel if task.pending? }
  @timers.each { |task| task.shutdown }
  @executor.shutdown
  @executor.wait_for_termination(30)
end