Class: RocketJob::Worker
- Inherits:
-
Object
- Object
- RocketJob::Worker
- Includes:
- AASM, MongoMapper::Document, SemanticLogger::Loggable, SyncAttr
- Defined in:
- lib/rocket_job/worker.rb
Overview
Worker
On startup a worker instance will automatically register itself if not already present
Starting a worker in the foreground:
- Using a Rails runner:
bin/rocketjob
Starting a worker in the background:
- Using a Rails runner:
nohup bin/rocketjob --quiet 2>&1 1>output.log &
Stopping a worker:
- Stop the worker via the Web UI
- Send a regular kill signal to make it shutdown once all active work is complete
kill <pid>
- Or, use the following Ruby code:
worker = RocketJob::Worker.where(name: 'worker name').first
worker.stop!
Sending the kill signal locally will result in starting the shutdown process
immediately. Via the UI or Ruby code the worker can take up to 15 seconds
(the heartbeat interval) to start shutting down.
Instance Attribute Summary collapse
-
#thread_pool ⇒ Object
readonly
Returns [Array<Thread>] threads in the thread_pool.
Class Method Summary collapse
-
.create_indexes ⇒ Object
Create indexes.
-
.destroy_dead_workers ⇒ Object
Destroy dead workers ( missed at least the last 4 heartbeats ) Requeue jobs assigned to dead workers Destroy dead workers.
-
.pause_all ⇒ Object
Pause all running workers.
-
.register_destroy_handler(&block) ⇒ Object
Register a handler to perform cleanups etc.
-
.resume_all ⇒ Object
Resume all paused workers.
-
.run(attrs = {}) ⇒ Object
Run the worker process Attributes supplied are passed to #new.
-
.stop_all ⇒ Object
Stop all running, paused, or starting workers.
Instance Method Summary collapse
-
#run ⇒ Object
Run this instance of the worker.
-
#shutting_down? ⇒ Boolean
Returns [Boolean] whether the worker is shutting down.
- #thread_pool_count ⇒ Object
Instance Attribute Details
#thread_pool ⇒ Object (readonly)
Returns [Array<Thread>] threads in the thread_pool
159 160 161 |
# File 'lib/rocket_job/worker.rb', line 159 def thread_pool @thread_pool end |
Class Method Details
.create_indexes ⇒ Object
Create indexes
109 110 111 112 113 |
# File 'lib/rocket_job/worker.rb', line 109 def self.create_indexes ensure_index [[:name, 1]], background: true, unique: true # Also create indexes for the jobs collection Job.create_indexes end |
.destroy_dead_workers ⇒ Object
Destroy dead workers ( missed at least the last 4 heartbeats ) Requeue jobs assigned to dead workers Destroy dead workers
118 119 120 121 122 123 124 125 |
# File 'lib/rocket_job/worker.rb', line 118 def self.destroy_dead_workers dead_seconds = Config.instance.heartbeat_seconds * 4 each do |worker| next if (Time.now - worker.heartbeat.updated_at) < dead_seconds logger.warn "Destroying worker #{worker.name}, and requeueing its jobs" worker.destroy end end |
.pause_all ⇒ Object
Pause all running workers
133 134 135 |
# File 'lib/rocket_job/worker.rb', line 133 def self.pause_all where(state: 'running').each { |worker| worker.pause! } end |
.register_destroy_handler(&block) ⇒ Object
Register a handler to perform cleanups etc. whenever a worker is explicitly destroyed
144 145 146 |
# File 'lib/rocket_job/worker.rb', line 144 def self.register_destroy_handler(&block) @@destroy_handlers << block end |
.resume_all ⇒ Object
Resume all paused workers
138 139 140 |
# File 'lib/rocket_job/worker.rb', line 138 def self.resume_all each { |worker| worker.resume! if worker.paused? } end |
.run(attrs = {}) ⇒ Object
Run the worker process Attributes supplied are passed to #new
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/rocket_job/worker.rb', line 97 def self.run(attrs={}) worker = new(attrs) worker.build_heartbeat worker.save! create_indexes register_signal_handlers raise "The RocketJob configuration is being applied after the system has been initialized" unless RocketJob::Job.database.name == RocketJob::SlicedJob.database.name logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}" worker.run end |
.stop_all ⇒ Object
Stop all running, paused, or starting workers
128 129 130 |
# File 'lib/rocket_job/worker.rb', line 128 def self.stop_all where(state: ['running', 'paused', 'starting']).each { |worker| worker.stop! } end |
Instance Method Details
#run ⇒ Object
Run this instance of the worker
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/rocket_job/worker.rb', line 164 def run Thread.current.name = 'RocketJob main' build_heartbeat unless heartbeat started adjust_thread_pool(true) save logger.info "RocketJob Worker started with #{max_threads} workers running" count = 0 loop do # Update heartbeat so that monitoring tools know that this worker is alive set( 'heartbeat.updated_at' => Time.now, 'heartbeat.current_threads' => thread_pool_count ) # Reload the worker model every 10 heartbeats in case its config was changed # TODO make 3 configurable if count >= 3 reload adjust_thread_pool count = 0 else count += 1 end # Stop worker if shutdown signal was raised stop! if self.class.shutdown && !stopping? break if stopping? sleep Config.instance.heartbeat_seconds end logger.info 'Waiting for worker threads to stop' # TODO Put a timeout on join. # Log Thread dump for active threads # Compare thread dumps for any changes, force down if no change? # reload, if model missing: Send Shutdown exception to each thread # 5 more seconds then exit thread_pool.each { |t| t.join } logger.info 'Shutdown' rescue Exception => exc logger.error('RocketJob::Worker is stopping due to an exception', exc) ensure # Destroy this worker instance destroy end |
#shutting_down? ⇒ Boolean
Returns [Boolean] whether the worker is shutting down
149 150 151 152 153 154 155 156 |
# File 'lib/rocket_job/worker.rb', line 149 def shutting_down? if self.class.shutdown stop! if running? true else !running? end end |
#thread_pool_count ⇒ Object
213 214 215 |
# File 'lib/rocket_job/worker.rb', line 213 def thread_pool_count thread_pool.count{ |i| i.alive? } end |