Class: Exekutor::Worker

Inherits:
Object
  • Object
show all
Includes:
Internal::Executable
Defined in:
lib/exekutor/worker.rb

Overview

The job worker

Constant Summary

Constants included from Internal::Executable

Internal::Executable::STATES

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Internal::Executable

#consecutive_errors, #restart_delay, #running?, #state

Constructor Details

#initialize(config = {}) ⇒ Worker

Creates a new worker with the specified config

Parameters:

  • config (Hash) (defaults to: {})

    The worker configuration

Options Hash (config):

  • :identifier (String)

    the identifier for this worker

  • :queues (Array<String>)

    the queues to work on

  • :min_threads (Integer)

    the minimum number of execution threads that should be active

  • :max_threads (Integer)

    the maximum number of execution threads that may be active

  • :max_thread_idletime (ActiveSupport::Duration)

    the maximum duration a thread may be idle before being stopped

  • :polling_interval (ActiveSupport::Duration)

    the polling interval

  • :poling_jitter (Float)

    the polling jitter

  • :set_db_connection_name (Boolean)

    whether the DB connection name should be set

  • :wait_for_termination (Integer, Boolean)

    how long the worker should wait on jobs to be completed before exiting

  • :status_server_port (Integer)

    the port to run the status server on

  • :status_server_handler (String)

    The name of the rack handler to use for the status server

  • :healthcheck_timeout (ActiveSupport::Duration)

    The timeout of a worker in minutes before the healthcheck server deems it as down



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/exekutor/worker.rb', line 37

def initialize(config = {})
  super()
  @config = config
  @record = create_record!

  provider_pool = create_provider_pool(config)

  @executor = create_executor(config)
  @provider = create_provider(config, @executor, provider_pool)

  @executables = [@executor, @provider]
  create_listener(provider_pool, config) if config.fetch(:enable_listener, true)
  create_status_server(provider_pool, config) if config[:status_server_port].to_i.positive?
  @executables.freeze
end

Instance Attribute Details

#recordObject (readonly)

Returns the value of attribute record.



10
11
12
# File 'lib/exekutor/worker.rb', line 10

def record
  @record
end

Class Method Details

.start(config = {}) ⇒ Object

Creates a new worker with the specified config and immediately starts it

Returns:

  • The worker

See Also:



16
17
18
# File 'lib/exekutor/worker.rb', line 16

def self.start(config = {})
  new(config).tap(&:start)
end

Instance Method Details

#idObject

The worker ID.



123
124
125
# File 'lib/exekutor/worker.rb', line 123

def id
  @record.id
end

#joinObject

Blocks until the worker is stopped.



109
110
111
112
113
114
115
# File 'lib/exekutor/worker.rb', line 109

def join
  @stop_event = Concurrent::Event.new
  Kernel.loop do
    @stop_event.wait 10
    break unless running?
  end
end

#killObject

Kills the worker. Does not wait for any jobs to be completed.

Returns:

  • true



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/exekutor/worker.rb', line 94

def kill
  Thread.new do
    @executables.reverse_each(&:stop)
    @stop_event&.set if defined?(@stop_event)
  end
  @executor.kill
  begin
    @record.destroy
  rescue StandardError
    # ignored
  end
  true
end

#last_heartbeatTime?

Returns The timestamp of the last heartbeat. The timestamp is truncated to whole minutes.

Returns:

  • (Time, nil)

    The timestamp of the last heartbeat. The timestamp is truncated to whole minutes.



128
129
130
# File 'lib/exekutor/worker.rb', line 128

def last_heartbeat
  @record.last_heartbeat_at
end

#reserve_jobsObject

Reserves and executes jobs.



118
119
120
# File 'lib/exekutor/worker.rb', line 118

def reserve_jobs
  @provider.poll if @provider&.running?
end

#startBoolean

Starts the worker. Does nothing if the worker has already started.

Returns:

  • (Boolean)

    whether the worker was started



55
56
57
58
59
60
61
62
63
64
# File 'lib/exekutor/worker.rb', line 55

def start
  return false unless compare_and_set_state(:pending, :started)

  Internal::Hooks.run :startup, self do
    @executables.each(&:start)
    @record.update(status: "r")
  end

  true
end

#stopObject

Stops the worker. If wait_for_termination is set, this method blocks until the execution thread is terminated or killed.

Returns:

  • true



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/exekutor/worker.rb', line 69

def stop
  Internal::Hooks.run :shutdown, self do
    self.state = :stopped
    unless @record.destroyed?
      begin
        @record.update(status: "s")
      rescue StandardError
        # ignored
      end
    end
    @executables.reverse_each(&:stop)
    wait_for_termination @config[:wait_for_termination]

    begin
      @record.destroy
    rescue StandardError
      # ignored
    end
    @stop_event&.set if defined?(@stop_event)
  end
  true
end

#thread_statsHash

Returns the thread usage for this worker. The resulting hash will contain the following key-value pairs:

  • :minimum, (Integer) the minimum number of threads that should be active;

  • :maximum, (Integer) the maximum number of threads may should be active;

  • :available, (Integer) the number of threads that are available to execute new jobs;

  • :usage_percent, (Float, 0-100) the percentage of workers that are currently busy executing jobs.

Returns:

  • (Hash)

    the thread usage



138
139
140
141
142
143
144
145
146
147
# File 'lib/exekutor/worker.rb', line 138

def thread_stats
  available = @executor.available_threads
  usage_percent = (((100 - (available * 100.0 / @executor.maximum_threads))).round(2) if @executor.running?)
  {
    minimum: @executor.minimum_threads,
    maximum: @executor.maximum_threads,
    available: available,
    usage_percent: usage_percent
  }
end