Class: Workhorse::Worker
- Inherits:
-
Object
- Object
- Workhorse::Worker
- Defined in:
- lib/workhorse/worker.rb
Constant Summary collapse
- LOG_LEVELS =
%i[fatal error warn info debug].freeze
- SHUTDOWN_SIGNALS =
%w[TERM INT].freeze
- LOG_REOPEN_SIGNAL =
'HUP'.freeze
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
#poller ⇒ Object
readonly
Returns the value of attribute poller.
-
#polling_interval ⇒ Object
readonly
Returns the value of attribute polling_interval.
-
#pool_size ⇒ Object
readonly
Returns the value of attribute pool_size.
-
#queues ⇒ Object
readonly
Returns the value of attribute queues.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
Class Method Summary collapse
- .shutdown_file_for(pid) ⇒ Object
-
.start_and_wait(**args) ⇒ Object
Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).
Instance Method Summary collapse
- #assert_state!(state) ⇒ Object
- #hostname ⇒ Object
- #id ⇒ Object
- #idle ⇒ Object
-
#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker
constructor
Instantiates a new worker.
- #log(text, level = :info) ⇒ Object
- #perform(db_job_id) ⇒ Object
- #pid ⇒ Object
-
#shutdown ⇒ Object
Shuts down worker and DB poller.
-
#start ⇒ Object
Starts the worker.
-
#wait ⇒ Object
Waits until the worker is shut down.
Constructor Details
#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) ⇒ Worker
Instantiates a new worker. The worker is not automatically started.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/workhorse/worker.rb', line 50 def initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, instant_repolling: false, logger: nil) @queues = queues @pool_size = pool_size || (queues.size + 1) @polling_interval = polling_interval @auto_terminate = auto_terminate @state = :initialized @quiet = quiet @mutex = Mutex.new @pool = Pool.new(@pool_size) @poller = Workhorse::Poller.new(self, proc { check_memory }) @logger = logger unless (@polling_interval / 0.1).round(2).modulo(1).zero? fail 'Polling interval must be a multiple of 0.1.' end if instant_repolling @pool.on_idle { @poller.instant_repoll! } end end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/workhorse/worker.rb', line 12 def logger @logger end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
11 12 13 |
# File 'lib/workhorse/worker.rb', line 11 def mutex @mutex end |
#poller ⇒ Object (readonly)
Returns the value of attribute poller.
13 14 15 |
# File 'lib/workhorse/worker.rb', line 13 def poller @poller end |
#polling_interval ⇒ Object (readonly)
Returns the value of attribute polling_interval.
10 11 12 |
# File 'lib/workhorse/worker.rb', line 10 def polling_interval @polling_interval end |
#pool_size ⇒ Object (readonly)
Returns the value of attribute pool_size.
9 10 11 |
# File 'lib/workhorse/worker.rb', line 9 def pool_size @pool_size end |
#queues ⇒ Object (readonly)
Returns the value of attribute queues.
7 8 9 |
# File 'lib/workhorse/worker.rb', line 7 def queues @queues end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
8 9 10 |
# File 'lib/workhorse/worker.rb', line 8 def state @state end |
Class Method Details
.shutdown_file_for(pid) ⇒ Object
24 25 26 27 |
# File 'lib/workhorse/worker.rb', line 24 def self.shutdown_file_for(pid) return nil unless defined?(Rails) Rails.root.join('tmp', 'pids', "workhorse.#{pid}.shutdown") end |
.start_and_wait(**args) ⇒ Object
Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).
17 18 19 20 21 |
# File 'lib/workhorse/worker.rb', line 17 def self.start_and_wait(**args) worker = new(**args) worker.start worker.wait end |
Instance Method Details
#assert_state!(state) ⇒ Object
107 108 109 |
# File 'lib/workhorse/worker.rb', line 107 def assert_state!(state) fail "Expected worker to be in state #{state} but current state is #{self.state}." unless self.state == state end |
#hostname ⇒ Object
88 89 90 |
# File 'lib/workhorse/worker.rb', line 88 def hostname @hostname ||= Socket.gethostname end |
#id ⇒ Object
80 81 82 |
# File 'lib/workhorse/worker.rb', line 80 def id @id ||= "#{hostname}.#{pid}.#{SecureRandom.hex(3)}" end |
#idle ⇒ Object
140 141 142 |
# File 'lib/workhorse/worker.rb', line 140 def idle @pool.idle end |
#log(text, level = :info) ⇒ Object
72 73 74 75 76 77 78 |
# File 'lib/workhorse/worker.rb', line 72 def log(text, level = :info) text = "[Job worker #{id}] #{text}" puts text unless @quiet return unless logger fail "Log level #{level} is not available. Available are #{LOG_LEVELS.inspect}." unless LOG_LEVELS.include?(level) logger.send(level, text.strip) end |
#perform(db_job_id) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/workhorse/worker.rb', line 144 def perform(db_job_id) begin # rubocop:disable Style/RedundantBegin mutex.synchronize do assert_state! :running log "Posting job #{db_job_id} to thread pool" @pool.post do begin # rubocop:disable Style/RedundantBegin Workhorse::Performer.new(db_job_id, self).perform rescue Exception => e log %(#{e.}\n#{e.backtrace.join("\n")}), :error Workhorse.on_exception.call(e) end end end rescue Exception => e Workhorse.on_exception.call(e) end end |
#pid ⇒ Object
84 85 86 |
# File 'lib/workhorse/worker.rb', line 84 def pid @pid ||= Process.pid end |
#shutdown ⇒ Object
Shuts down worker and DB poller. Jobs currently beeing processed are properly finished before this method returns. Subsequent calls to this method are ignored.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/workhorse/worker.rb', line 114 def shutdown # This is safe to be checked outside of the mutex as 'shutdown' is the # final state this worker can be in. return if @state == :shutdown mutex.synchronize do assert_state! :running log 'Shutting down' @state = :shutdown @poller.shutdown @pool.shutdown log 'Shut down' end end |
#start ⇒ Object
Starts the worker. This call is not blocking - call #wait for this purpose.
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/workhorse/worker.rb', line 94 def start mutex.synchronize do assert_state! :initialized log 'Starting up' @state = :running @poller.start log 'Started up' trap_termination if @auto_terminate trap_log_reopen end end |
#wait ⇒ Object
Waits until the worker is shut down. This only happens if shutdown gets called - either by another thread or by enabling ‘auto_terminate` and receiving a respective signal. Use this method to let worker run undefinitely.
135 136 137 138 |
# File 'lib/workhorse/worker.rb', line 135 def wait @poller.wait @pool.wait end |