Class: MongoJob::Worker

Inherits:
Object
  • Object
show all
Extended by:
Helpers, Mixins::FiberRunner::ClassMethods
Includes:
Helpers, Mixins::FiberRunner::InstanceMethods
Defined in:
lib/mongojob/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Mixins::FiberRunner::ClassMethods

task

Methods included from Helpers

classify, constantize

Methods included from Mixins::FiberRunner::InstanceMethods

#run_defined_tasks, #run_em_fiber

Constructor Details

#initialize(*queues) ⇒ Worker

Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.

If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/mongojob/worker.rb', line 48

def initialize(*queues)
  options = {}
  options = queues.pop if queues.last.is_a?(Hash)
  options = self.class.default_options.merge(options)
  queues = options[:queues] if (queues.nil? || queues.empty?)
  raise "No queues provided" if (queues.nil? || queues.empty?)
  @id = options[:id]
  @queues = queues
  @max_jobs = options[:max_jobs]
  @current_jobs = []
  @job_pids = {}
  
  # Initialize logger
  @log = ::Logger.new options[:log]
  @log.formatter = Logger::Formatter.new
  @log.level = options[:loglevel]
  $log = log
end

Instance Attribute Details

#current_jobsObject

Returns the value of attribute current_jobs.



26
27
28
# File 'lib/mongojob/worker.rb', line 26

def current_jobs
  @current_jobs
end

#logObject

Returns the value of attribute log.



27
28
29
# File 'lib/mongojob/worker.rb', line 27

def log
  @log
end

Class Method Details

.default_optionsObject



29
30
31
32
33
34
35
# File 'lib/mongojob/worker.rb', line 29

def self.default_options
  @default_options ||= {
    max_jobs: 1,
    log: STDOUT,
    loglevel: Logger::DEBUG
  }
end

.parse_optionsObject

Parse command-line parameters



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/mongojob/worker.rb', line 337

def self.parse_options
  options = {}
  OptionParser.new do |opts|
    opts.banner = "Usage: #{::File.basename($0)} [options]"
    opts.on('-q QUEUES', 'coma-separated queues this worker will handle') {|queues|
      options[:queues] = queues.split(/,\s*/)
    }
    opts.on('-h HOST', "--host HOST", "set the MongoDB host") {|host|
      MongoJob.host = host
    }
    opts.on('-d DATABASE_NAME', "--database-name DATABASE_NAME", "set the MongoDB database name") {|database_name|
      MongoJob.database_name = database_name
    }
    opts.on("-l LOGFILE", "logfile, or STDOUT to log to console") do |v|
      options[:log] = (v == 'STDOUT' ? STDOUT : v) 
    end
    opts.on("-v LOGLEVEL", "one of DEBUG, INFO, WARN, ERROR, FATAL") do |v|
      options[:loglevel] = v
    end
    opts.on("-r LOAD_MODULE", "requires an extra ruby file") do |v|
      require v
    end
    opts.on("-i ID", "set worker id") do |v|
      options[:id] = v
    end
    opts.on("-m MAX_JOBS", "max jobs ") do |v|
      options[:max_jobs] = v.to_i
    end
  end.parse!
  options
end

Instance Method Details

#custom_statusObject

Override this method if needed.



269
270
271
# File 'lib/mongojob/worker.rb', line 269

def custom_status
  {}
end

#fail_job(job, error) ⇒ Object

Mark job as failed



165
166
167
# File 'lib/mongojob/worker.rb', line 165

def fail_job job, error
  job.fail error
end

#finish_job(job) ⇒ Object

Removes job from the internal stack



158
159
160
161
162
# File 'lib/mongojob/worker.rb', line 158

def finish_job job
  job_id = job.respond_to?(:id) ? job.id : job
  @current_jobs.delete job_id
  @job_pids.delete(job_id)
end

#fork(&blk) ⇒ Object

Forks a process and runs the code passed in the block in the new process



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/mongojob/worker.rb', line 170

def fork &blk
  pid = Process.fork do
    if EM.reactor_running?
      # Need to clear EM reactor
      EM.stop_event_loop
      EM.release_machine
      EM.instance_variable_set( '@reactor_running', false )
    end
    # TODO: Should we rescue exceptions from the block call?
    blk.call
    Process.exit!(0)
  end
  # Detach the process. We are not using Process.wait.
#      Process.detach pid
  pid
end

#get_new_jobObject



128
129
130
131
132
133
134
135
136
# File 'lib/mongojob/worker.rb', line 128

def get_new_job
  return if @current_jobs.size >= @max_jobs
  job = nil
  @queues.find do |queue|
    job = MongoJob.reserve(queue, self.id)
  end
  @current_jobs << job.id if job
  job
end

#hostnameObject

chomp’d hostname of this machine



68
69
70
# File 'lib/mongojob/worker.rb', line 68

def hostname
  @hostname ||= `hostname`.strip
end

#idObject



72
73
74
# File 'lib/mongojob/worker.rb', line 72

def id
  @id ||= "#{hostname}:#{Process.pid}"
end

#kill_jobsObject

Kills all jobs



317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/mongojob/worker.rb', line 317

def kill_jobs
  log.debug "Immediately killing all jobs"
  @job_pids.each do |job_id, pid|
    log.debug "Killing process #{pid} with job #{job_id}"
    Process.kill 'KILL', pid
  end
  
  # How to kill fiber jobs? Remove them from @current_jobs, mark as failed
  fiber_jobs = @current_jobs.select{|job_id| ! @job_pids[job_id]}
  fiber_jobs.each do |job_id|
    # FAIL FAIL FAIL!!!
    job = MongoJob.find_job job_id
    if job
      job.fail "Process killed."
    end
    finish_job job_id
  end
end

#monitor_jobsObject

Monitors jobs and pings storage if they are alive. Currently it monitors only forked processes



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/mongojob/worker.rb', line 189

def monitor_jobs
  @job_pids.each do |job_id, pid|
    # Check if alive
    line = `ps -www -o rss,state -p #{pid}`.split("\n")[1]
    rss = state = nil
    running = true
    if line
      rss, state = line.split ' '
      log.debug "Process #{pid} for job #{job_id} in state #{state}, uses #{rss}k mem"
    else
      # Missing process, which means something went very wrong.
      # TODO: report it!
      log.debug "Process #{pid} for job #{job_id} is missing!"
      running = false
    end
    
    # Now check if finished, which means it will be in Z (zombie) status
    # TODO: should we use EventMachine#watch_process ?
    if state =~ /Z/
      # Process completed, collect information
      pid, status = Process.wait2 pid
      log.debug "Process #{pid} for job #{job_id} exited with status #{status.exitstatus}"
      running = false
    end
    
    job = MongoJob.find_job job_id
    
    if running
      # Still running, so ping database
      # One more thing to check - if the job does not exist, we are killing the process.
      if job
        job.ping
      else
        log.info "Job #{job_id} for process #{pid} is missing, killing"
        Process.kill 'KILL', pid
      end
    else
      # Process not running
      # Check the status of the job - if it is still marked as "working", we should set its
      # status to "failed"
      if job && job.status == 'working'
        job.fail "Process missing."
      end
      # For sure we are not working on it anymore, so remove from the stack
      finish_job job_id
    end
    
  end
end

#process_job(job) ⇒ Object

Processes the job, in the child process if forking.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/mongojob/worker.rb', line 139

def process_job job
  begin
    log.info "Performing job #{job.id}"
    jo = job.job_object
    jo.log = log
    jo.perform
    log.info "Job #{job.id} completed"
    job.complete
    Model::Worker.increment(id, {:'stats.done' => 1})
  rescue Exception => e
    log.info "Job #{job.id} failed"
    log.info e
    job.fail e
    Model::Worker.increment(id, {:'stats.failed' => 1})
    p e
  end
end

#real_ipObject

Retrieves the real IP address of the machine



274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/mongojob/worker.rb', line 274

def real_ip
  return @real_ip if @real_ip
  begin
    orig, Socket.do_not_reverse_lookup = Socket.do_not_reverse_lookup, true  # turn off reverse DNS resolution temporarily

    UDPSocket.open do |s|
      s.connect '64.233.187.99', 1
      @real_ip = s.addr.last
    end
  ensure
    Socket.do_not_reverse_lookup = orig
  end
  @real_ip
end

#register_signal_handlersObject

Registers the various signal handlers a worker responds to.

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing.



294
295
296
297
298
299
300
301
# File 'lib/mongojob/worker.rb', line 294

def register_signal_handlers
  trap('TERM') { shutdown!  }
  trap('INT')  { shutdown!  }

  trap('QUIT') { shutdown   }

  log.info "Registered signals"
end

#runObject

Runs the worker



77
78
79
80
81
82
83
# File 'lib/mongojob/worker.rb', line 77

def run
  log.info "Starting worker"
  register_signal_handlers
  EM.run do
    run_defined_tasks
  end
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current jobs.



305
306
307
308
# File 'lib/mongojob/worker.rb', line 305

def shutdown
  log.info 'Shutting down...'
  @shutdown = true
end

#shutdown!Object

Kill the child and shutdown immediately.



311
312
313
314
# File 'lib/mongojob/worker.rb', line 311

def shutdown!
  shutdown
  kill_jobs
end

#tickObject

Periodically send pings so that we know that the worker is alive. The method also checks stored worker status and shuts down the worker if the stored status indicates failure or timeout.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/mongojob/worker.rb', line 242

def tick
  worker = Model::Worker.find id
  
  # Shut down if there is no worker status stored
  # shutdown! unless worker
  
  # Shut down if worker status is different than 'ok'
  # shutdown! unless worker.status == 'ok'
  
  data = tick_data.merge({
    pinged_at: Time.now,
    status: 'ok',
    queues: @queues
  })
 Model::Worker.tick id, data
end

#tick_dataObject

Prepares data to be send alongside with the tick.



260
261
262
263
264
265
266
# File 'lib/mongojob/worker.rb', line 260

def tick_data
  {
    hostname: hostname,
    ip: real_ip,
    custom_status: custom_status
  }
end

#work_jobObject

Contains the working cycle:

  1. Maintanance stuff

  2. Get a job

  3. Run a job



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/mongojob/worker.rb', line 89

def work_job
  
  # MAINTENANCE
  
  # Are we shutting down?
  if @shutdown
    Kernel.exit!(0) if @current_jobs.size == 0
  end
  
  # PROCESSING JOBS
  
  # Get a job
  job = get_new_job
  return unless job
  log.info "Got a new job #{job.id}"
  
  if job.job_class.fork?
    # Job that requires a fork, perfect for long-running stuff.
    log.debug "Forking the process for job #{job.id}"
    pid = fork do
      process_job job
    end
    @job_pids[job.id] = pid
    # TODO: We need to store which PID corresponds to this job
  elsif job.job_class.fiber?
    # A job that requires a separate fiber.
    log.debug "Creating a new fiber for job #{job.id}"
    Fiber.new do
      process_job job
      finish_job job
    end.resume
  else
    # Old-school, blocking job
    log.debug "Running job #{job.id} in the blocking mode"
    process_job job
    finish_job job
  end
end