Class: Aikido::Zen::Worker Private
- Inherits:
-
Object
- Object
- Aikido::Zen::Worker
- Defined in:
- lib/aikido/zen/worker.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
The worker manages the background thread in which Zen communicates with the Aikido server.
Instance Attribute Summary collapse
- #executor ⇒ Concurrent::ExecutorService readonly private
Instance Method Summary collapse
-
#delay(interval, &task) ⇒ void
private
Queue a block to be run asynchronously after a delay.
-
#every(interval, run_now: true, &task) ⇒ void
private
Queue a block to run repeatedly on a timer on the background thread.
-
#initialize(config: Aikido::Zen.config) ⇒ Worker
constructor
private
A new instance of Worker.
-
#perform(&block) ⇒ void
private
Queue a block to be run asynchronously in the background thread.
-
#shutdown ⇒ void
private
Safely clean up and kill the thread, giving time to kill any ongoing tasks on the queue.
Constructor Details
#initialize(config: Aikido::Zen.config) ⇒ Worker
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Worker.
17 18 19 20 21 22 |
# File 'lib/aikido/zen/worker.rb', line 17 def initialize(config: Aikido::Zen.config) @config = config @timers = [] @deferrals = [] @executor = Concurrent::SingleThreadExecutor.new end |
Instance Attribute Details
#executor ⇒ Concurrent::ExecutorService (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
12 13 14 |
# File 'lib/aikido/zen/worker.rb', line 12 def executor @executor end |
Instance Method Details
#delay(interval, &task) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Queue a block to be run asynchronously after a delay.
39 40 41 42 43 |
# File 'lib/aikido/zen/worker.rb', line 39 def delay(interval, &task) Concurrent::ScheduledTask .execute(interval, executor: executor) { perform(&task) } .tap { |deferral| @deferrals << deferral } end |
#every(interval, run_now: true, &task) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Queue a block to run repeatedly on a timer on the background thread. The timer will consider how long the block takes to run to schedule the next run. For example, if you schedule a block to run every 10 seconds, and the block itself takes 2 seconds, the second iteration will be run 8 seconds after the first one.
If the block takes longer than the given interval, the second iteration will be run immediately.
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/aikido/zen/worker.rb', line 58 def every(interval, run_now: true, &task) Concurrent::TimerTask .execute( run_now: run_now, executor: executor, interval_type: :fixed_rate, execution_interval: interval ) { perform(&task) } .tap { |timer| @timers << timer } end |
#perform(&block) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Queue a block to be run asynchronously in the background thread.
27 28 29 30 31 32 33 |
# File 'lib/aikido/zen/worker.rb', line 27 def perform(&block) executor.post do yield rescue Exception => err # rubocop:disable Lint/RescueException @config.logger.error "Error in background worker: #{err.inspect}" end end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Safely clean up and kill the thread, giving time to kill any ongoing tasks on the queue.
75 76 77 78 79 80 |
# File 'lib/aikido/zen/worker.rb', line 75 def shutdown @deferrals.each { |task| task.cancel if task.pending? } @timers.each { |task| task.shutdown } @executor.shutdown @executor.wait_for_termination(30) end |