Class: Exekutor::Worker
- Inherits:
-
Object
- Object
- Exekutor::Worker
- Includes:
- Internal::Executable
- Defined in:
- lib/exekutor/worker.rb
Overview
The job worker
Constant Summary
Constants included from Internal::Executable
Instance Attribute Summary collapse
-
#record ⇒ Object
readonly
Returns the value of attribute record.
Class Method Summary collapse
-
.start(config = {}) ⇒ Object
Creates a new worker with the specified config and immediately starts it.
Instance Method Summary collapse
-
#id ⇒ Object
The worker ID.
-
#initialize(config = {}) ⇒ Worker
constructor
Creates a new worker with the specified config.
-
#join ⇒ Object
Blocks until the worker is stopped.
-
#kill ⇒ Object
Kills the worker.
-
#last_heartbeat ⇒ Time?
The timestamp of the last heartbeat.
-
#reserve_jobs ⇒ Object
Reserves and executes jobs.
-
#start ⇒ Boolean
Starts the worker.
-
#stop ⇒ Object
Stops the worker.
-
#thread_stats ⇒ Hash
Returns the thread usage for this worker.
Methods included from Internal::Executable
#consecutive_errors, #restart_delay, #running?, #state
Constructor Details
#initialize(config = {}) ⇒ Worker
Creates a new worker with the specified config
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/exekutor/worker.rb', line 37 def initialize(config = {}) super() @config = config @record = create_record! provider_pool = create_provider_pool(config) @executor = create_executor(config) @provider = create_provider(config, @executor, provider_pool) @executables = [@executor, @provider] create_listener(provider_pool, config) if config.fetch(:enable_listener, true) create_status_server(provider_pool, config) if config[:status_server_port].to_i.positive? @executables.freeze end |
Instance Attribute Details
#record ⇒ Object (readonly)
Returns the value of attribute record.
10 11 12 |
# File 'lib/exekutor/worker.rb', line 10 def record @record end |
Class Method Details
.start(config = {}) ⇒ Object
Creates a new worker with the specified config and immediately starts it
16 17 18 |
# File 'lib/exekutor/worker.rb', line 16 def self.start(config = {}) new(config).tap(&:start) end |
Instance Method Details
#id ⇒ Object
The worker ID.
123 124 125 |
# File 'lib/exekutor/worker.rb', line 123 def id @record.id end |
#join ⇒ Object
Blocks until the worker is stopped.
109 110 111 112 113 114 115 |
# File 'lib/exekutor/worker.rb', line 109 def join @stop_event = Concurrent::Event.new Kernel.loop do @stop_event.wait 10 break unless running? end end |
#kill ⇒ Object
Kills the worker. Does not wait for any jobs to be completed.
94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/exekutor/worker.rb', line 94 def kill Thread.new do @executables.reverse_each(&:stop) @stop_event&.set if defined?(@stop_event) end @executor.kill begin @record.destroy rescue StandardError # ignored end true end |
#last_heartbeat ⇒ Time?
Returns The timestamp of the last heartbeat. The timestamp is truncated to whole minutes.
128 129 130 |
# File 'lib/exekutor/worker.rb', line 128 def last_heartbeat @record.last_heartbeat_at end |
#reserve_jobs ⇒ Object
Reserves and executes jobs.
118 119 120 |
# File 'lib/exekutor/worker.rb', line 118 def reserve_jobs @provider.poll if @provider&.running? end |
#start ⇒ Boolean
Starts the worker. Does nothing if the worker has already started.
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/exekutor/worker.rb', line 55 def start return false unless compare_and_set_state(:pending, :started) Internal::Hooks.run :startup, self do @executables.each(&:start) @record.update(status: "r") end true end |
#stop ⇒ Object
Stops the worker. If wait_for_termination
is set, this method blocks until the execution thread is terminated or killed.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/exekutor/worker.rb', line 69 def stop Internal::Hooks.run :shutdown, self do self.state = :stopped unless @record.destroyed? begin @record.update(status: "s") rescue StandardError # ignored end end @executables.reverse_each(&:stop) wait_for_termination @config[:wait_for_termination] begin @record.destroy rescue StandardError # ignored end @stop_event&.set if defined?(@stop_event) end true end |
#thread_stats ⇒ Hash
Returns the thread usage for this worker. The resulting hash will contain the following key-value pairs:
-
:minimum
, (Integer) the minimum number of threads that should be active; -
:maximum
, (Integer) the maximum number of threads may should be active; -
:available
, (Integer) the number of threads that are available to execute new jobs; -
:usage_percent
, (Float, 0-100) the percentage of workers that are currently busy executing jobs.
138 139 140 141 142 143 144 145 146 147 |
# File 'lib/exekutor/worker.rb', line 138 def thread_stats available = @executor.available_threads usage_percent = (((100 - (available * 100.0 / @executor.maximum_threads))).round(2) if @executor.running?) { minimum: @executor.minimum_threads, maximum: @executor.maximum_threads, available: available, usage_percent: usage_percent } end |