Class: RocketJob::Worker

Inherits:
Object
  • Object
show all
Includes:
AASM, MongoMapper::Document, SemanticLogger::Loggable, SyncAttr
Defined in:
lib/rocket_job/worker.rb

Overview

Worker

On startup a worker instance will automatically register itself if not already present

Starting a worker in the foreground:

- Using a Rails runner:
  bin/rocketjob

Starting a worker in the background:

- Using a Rails runner:
  nohup bin/rocketjob --quiet 2>&1 1>output.log &

Stopping a worker:

- Stop the worker via the Web UI
- Send a regular kill signal to make it shutdown once all active work is complete
    kill <pid>
- Or, use the following Ruby code:
  worker = RocketJob::Worker.where(name: 'worker name').first
  worker.stop!

Sending the kill signal locally will result in starting the shutdown process
immediately. Via the UI or Ruby code the worker can take up to 15 seconds
(the heartbeat interval) to start shutting down.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#thread_poolObject (readonly)

Returns [Array<Thread>] threads in the thread_pool



159
160
161
# File 'lib/rocket_job/worker.rb', line 159

def thread_pool
  @thread_pool
end

Class Method Details

.create_indexesObject

Create indexes



109
110
111
112
113
# File 'lib/rocket_job/worker.rb', line 109

def self.create_indexes
  ensure_index [[:name, 1]], background: true, unique: true
  # Also create indexes for the jobs collection
  Job.create_indexes
end

.destroy_dead_workersObject

Destroy dead workers ( missed at least the last 4 heartbeats ) Requeue jobs assigned to dead workers Destroy dead workers



118
119
120
121
122
123
124
125
# File 'lib/rocket_job/worker.rb', line 118

def self.destroy_dead_workers
  dead_seconds = Config.instance.heartbeat_seconds * 4
  each do |worker|
    next if (Time.now - worker.heartbeat.updated_at) < dead_seconds
    logger.warn "Destroying worker #{worker.name}, and requeueing its jobs"
    worker.destroy
  end
end

.pause_allObject

Pause all running workers



133
134
135
# File 'lib/rocket_job/worker.rb', line 133

def self.pause_all
  where(state: 'running').each { |worker| worker.pause! }
end

.register_destroy_handler(&block) ⇒ Object

Register a handler to perform cleanups etc. whenever a worker is explicitly destroyed



144
145
146
# File 'lib/rocket_job/worker.rb', line 144

def self.register_destroy_handler(&block)
  @@destroy_handlers << block
end

.resume_allObject

Resume all paused workers



138
139
140
# File 'lib/rocket_job/worker.rb', line 138

def self.resume_all
  each { |worker| worker.resume! if worker.paused? }
end

.run(attrs = {}) ⇒ Object

Run the worker process Attributes supplied are passed to #new



97
98
99
100
101
102
103
104
105
106
# File 'lib/rocket_job/worker.rb', line 97

def self.run(attrs={})
  worker = new(attrs)
  worker.build_heartbeat
  worker.save!
  create_indexes
  register_signal_handlers
  raise "The RocketJob configuration is being applied after the system has been initialized" unless RocketJob::Job.database.name == RocketJob::SlicedJob.database.name
  logger.info "Using MongoDB Database: #{RocketJob::Job.database.name}"
  worker.run
end

.stop_allObject

Stop all running, paused, or starting workers



128
129
130
# File 'lib/rocket_job/worker.rb', line 128

def self.stop_all
  where(state: ['running', 'paused', 'starting']).each { |worker| worker.stop! }
end

Instance Method Details

#runObject

Run this instance of the worker



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/rocket_job/worker.rb', line 164

def run
  Thread.current.name = 'RocketJob main'
  build_heartbeat unless heartbeat

  started
  adjust_thread_pool(true)
  save
  logger.info "RocketJob Worker started with #{max_threads} workers running"

  count = 0
  loop do
    # Update heartbeat so that monitoring tools know that this worker is alive
    set(
      'heartbeat.updated_at'      => Time.now,
      'heartbeat.current_threads' => thread_pool_count
    )

    # Reload the worker model every 10 heartbeats in case its config was changed
    # TODO make 3 configurable
    if count >= 3
      reload
      adjust_thread_pool
      count = 0
    else
      count += 1
    end

    # Stop worker if shutdown signal was raised
    stop! if self.class.shutdown && !stopping?

    break if stopping?

    sleep Config.instance.heartbeat_seconds
  end
  logger.info 'Waiting for worker threads to stop'
  # TODO Put a timeout on join.
  # Log Thread dump for active threads
  # Compare thread dumps for any changes, force down if no change?
  # reload, if model missing: Send Shutdown exception to each thread
  #           5 more seconds then exit
  thread_pool.each { |t| t.join }
  logger.info 'Shutdown'
rescue Exception => exc
  logger.error('RocketJob::Worker is stopping due to an exception', exc)
ensure
  # Destroy this worker instance
  destroy
end

#shutting_down?Boolean

Returns [Boolean] whether the worker is shutting down

Returns:

  • (Boolean)


149
150
151
152
153
154
155
156
# File 'lib/rocket_job/worker.rb', line 149

def shutting_down?
  if self.class.shutdown
    stop! if running?
    true
  else
    !running?
  end
end

#thread_pool_countObject



213
214
215
# File 'lib/rocket_job/worker.rb', line 213

def thread_pool_count
  thread_pool.count{ |i| i.alive? }
end