Class: MongoJob::Worker
- Inherits:
-
Object
- Object
- MongoJob::Worker
- Extended by:
- Helpers, Mixins::FiberRunner::ClassMethods
- Includes:
- Helpers, Mixins::FiberRunner::InstanceMethods
- Defined in:
- lib/mongojob/worker.rb
Instance Attribute Summary collapse
-
#current_jobs ⇒ Object
Returns the value of attribute current_jobs.
-
#log ⇒ Object
Returns the value of attribute log.
Class Method Summary collapse
- .default_options ⇒ Object
-
.parse_options ⇒ Object
Parse command-line parameters.
Instance Method Summary collapse
-
#custom_status ⇒ Object
Override this method if needed.
-
#fail_job(job, error) ⇒ Object
Mark job as failed.
-
#finish_job(job) ⇒ Object
Removes job from the internal stack.
-
#fork(&blk) ⇒ Object
Forks a process and runs the code passed in the block in the new process.
- #get_new_job ⇒ Object
-
#hostname ⇒ Object
chomp’d hostname of this machine.
- #id ⇒ Object
-
#initialize(*queues) ⇒ Worker
constructor
Workers should be initialized with an array of string queue names.
-
#kill_jobs ⇒ Object
Kills all jobs.
-
#monitor_jobs ⇒ Object
Monitors jobs and pings storage if they are alive.
-
#process_job(job) ⇒ Object
Processes the job, in the child process if forking.
-
#real_ip ⇒ Object
Retrieves the real IP address of the machine.
-
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
-
#run ⇒ Object
Runs the worker.
-
#shutdown ⇒ Object
Schedule this worker for shutdown.
-
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
-
#tick ⇒ Object
Periodically send pings so that we know that the worker is alive.
-
#tick_data ⇒ Object
Prepares data to be send alongside with the tick.
-
#work_job ⇒ Object
Contains the working cycle: 0.
Methods included from Mixins::FiberRunner::ClassMethods
Methods included from Helpers
Methods included from Mixins::FiberRunner::InstanceMethods
#run_defined_tasks, #run_em_fiber
Constructor Details
#initialize(*queues) ⇒ Worker
Workers should be initialized with an array of string queue names. The order is important: a Worker will check the first queue given for a job. If none is found, it will check the second queue name given. If a job is found, it will be processed. Upon completion, the Worker will again check the first queue given, and so forth. In this way the queue list passed to a Worker on startup defines the priorities of queues.
If passed a single “*”, this Worker will operate on all queues in alphabetical order. Queues can be dynamically added or removed without needing to restart workers using this method.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/mongojob/worker.rb', line 48 def initialize(*queues) = {} = queues.pop if queues.last.is_a?(Hash) = self.class..merge() queues = [:queues] if (queues.nil? || queues.empty?) raise "No queues provided" if (queues.nil? || queues.empty?) @id = [:id] @queues = queues @max_jobs = [:max_jobs] @current_jobs = [] @job_pids = {} # Initialize logger @log = ::Logger.new [:log] @log.formatter = Logger::Formatter.new @log.level = [:loglevel] $log = log end |
Instance Attribute Details
#current_jobs ⇒ Object
Returns the value of attribute current_jobs.
26 27 28 |
# File 'lib/mongojob/worker.rb', line 26 def current_jobs @current_jobs end |
#log ⇒ Object
Returns the value of attribute log.
27 28 29 |
# File 'lib/mongojob/worker.rb', line 27 def log @log end |
Class Method Details
.default_options ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/mongojob/worker.rb', line 29 def self. @default_options ||= { max_jobs: 1, log: STDOUT, loglevel: Logger::DEBUG } end |
.parse_options ⇒ Object
Parse command-line parameters
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/mongojob/worker.rb', line 337 def self. = {} OptionParser.new do |opts| opts. = "Usage: #{::File.basename($0)} [options]" opts.on('-q QUEUES', 'coma-separated queues this worker will handle') {|queues| [:queues] = queues.split(/,\s*/) } opts.on('-h HOST', "--host HOST", "set the MongoDB host") {|host| MongoJob.host = host } opts.on('-d DATABASE_NAME', "--database-name DATABASE_NAME", "set the MongoDB database name") {|database_name| MongoJob.database_name = database_name } opts.on("-l LOGFILE", "logfile, or STDOUT to log to console") do |v| [:log] = (v == 'STDOUT' ? STDOUT : v) end opts.on("-v LOGLEVEL", "one of DEBUG, INFO, WARN, ERROR, FATAL") do |v| [:loglevel] = v end opts.on("-r LOAD_MODULE", "requires an extra ruby file") do |v| require v end opts.on("-i ID", "set worker id") do |v| [:id] = v end opts.on("-m MAX_JOBS", "max jobs ") do |v| [:max_jobs] = v.to_i end end.parse! end |
Instance Method Details
#custom_status ⇒ Object
Override this method if needed.
269 270 271 |
# File 'lib/mongojob/worker.rb', line 269 def custom_status {} end |
#fail_job(job, error) ⇒ Object
Mark job as failed
165 166 167 |
# File 'lib/mongojob/worker.rb', line 165 def fail_job job, error job.fail error end |
#finish_job(job) ⇒ Object
Removes job from the internal stack
158 159 160 161 162 |
# File 'lib/mongojob/worker.rb', line 158 def finish_job job job_id = job.respond_to?(:id) ? job.id : job @current_jobs.delete job_id @job_pids.delete(job_id) end |
#fork(&blk) ⇒ Object
Forks a process and runs the code passed in the block in the new process
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/mongojob/worker.rb', line 170 def fork &blk pid = Process.fork do if EM.reactor_running? # Need to clear EM reactor EM.stop_event_loop EM.release_machine EM.instance_variable_set( '@reactor_running', false ) end # TODO: Should we rescue exceptions from the block call? blk.call Process.exit!(0) end # Detach the process. We are not using Process.wait. # Process.detach pid pid end |
#get_new_job ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/mongojob/worker.rb', line 128 def get_new_job return if @current_jobs.size >= @max_jobs job = nil @queues.find do |queue| job = MongoJob.reserve(queue, self.id) end @current_jobs << job.id if job job end |
#hostname ⇒ Object
chomp’d hostname of this machine
68 69 70 |
# File 'lib/mongojob/worker.rb', line 68 def hostname @hostname ||= `hostname`.strip end |
#id ⇒ Object
72 73 74 |
# File 'lib/mongojob/worker.rb', line 72 def id @id ||= "#{hostname}:#{Process.pid}" end |
#kill_jobs ⇒ Object
Kills all jobs
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/mongojob/worker.rb', line 317 def kill_jobs log.debug "Immediately killing all jobs" @job_pids.each do |job_id, pid| log.debug "Killing process #{pid} with job #{job_id}" Process.kill 'KILL', pid end # How to kill fiber jobs? Remove them from @current_jobs, mark as failed fiber_jobs = @current_jobs.select{|job_id| ! @job_pids[job_id]} fiber_jobs.each do |job_id| # FAIL FAIL FAIL!!! job = MongoJob.find_job job_id if job job.fail "Process killed." end finish_job job_id end end |
#monitor_jobs ⇒ Object
Monitors jobs and pings storage if they are alive. Currently it monitors only forked processes
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/mongojob/worker.rb', line 189 def monitor_jobs @job_pids.each do |job_id, pid| # Check if alive line = `ps -www -o rss,state -p #{pid}`.split("\n")[1] rss = state = nil running = true if line rss, state = line.split ' ' log.debug "Process #{pid} for job #{job_id} in state #{state}, uses #{rss}k mem" else # Missing process, which means something went very wrong. # TODO: report it! log.debug "Process #{pid} for job #{job_id} is missing!" running = false end # Now check if finished, which means it will be in Z (zombie) status # TODO: should we use EventMachine#watch_process ? if state =~ /Z/ # Process completed, collect information pid, status = Process.wait2 pid log.debug "Process #{pid} for job #{job_id} exited with status #{status.exitstatus}" running = false end job = MongoJob.find_job job_id if running # Still running, so ping database # One more thing to check - if the job does not exist, we are killing the process. if job job.ping else log.info "Job #{job_id} for process #{pid} is missing, killing" Process.kill 'KILL', pid end else # Process not running # Check the status of the job - if it is still marked as "working", we should set its # status to "failed" if job && job.status == 'working' job.fail "Process missing." end # For sure we are not working on it anymore, so remove from the stack finish_job job_id end end end |
#process_job(job) ⇒ Object
Processes the job, in the child process if forking.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/mongojob/worker.rb', line 139 def process_job job begin log.info "Performing job #{job.id}" jo = job.job_object jo.log = log jo.perform log.info "Job #{job.id} completed" job.complete Model::Worker.increment(id, {:'stats.done' => 1}) rescue Exception => e log.info "Job #{job.id} failed" log.info e job.fail e Model::Worker.increment(id, {:'stats.failed' => 1}) p e end end |
#real_ip ⇒ Object
Retrieves the real IP address of the machine
274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/mongojob/worker.rb', line 274 def real_ip return @real_ip if @real_ip begin orig, Socket.do_not_reverse_lookup = Socket.do_not_reverse_lookup, true # turn off reverse DNS resolution temporarily UDPSocket.open do |s| s.connect '64.233.187.99', 1 @real_ip = s.addr.last end ensure Socket.do_not_reverse_lookup = orig end @real_ip end |
#register_signal_handlers ⇒ Object
Registers the various signal handlers a worker responds to.
TERM: Shutdown immediately, stop processing jobs.
INT: Shutdown immediately, stop processing jobs.
QUIT: Shutdown after the current job has finished processing.
294 295 296 297 298 299 300 301 |
# File 'lib/mongojob/worker.rb', line 294 def register_signal_handlers trap('TERM') { shutdown! } trap('INT') { shutdown! } trap('QUIT') { shutdown } log.info "Registered signals" end |
#run ⇒ Object
Runs the worker
77 78 79 80 81 82 83 |
# File 'lib/mongojob/worker.rb', line 77 def run log.info "Starting worker" register_signal_handlers EM.run do run_defined_tasks end end |
#shutdown ⇒ Object
Schedule this worker for shutdown. Will finish processing the current jobs.
305 306 307 308 |
# File 'lib/mongojob/worker.rb', line 305 def shutdown log.info 'Shutting down...' @shutdown = true end |
#shutdown! ⇒ Object
Kill the child and shutdown immediately.
311 312 313 314 |
# File 'lib/mongojob/worker.rb', line 311 def shutdown! shutdown kill_jobs end |
#tick ⇒ Object
Periodically send pings so that we know that the worker is alive. The method also checks stored worker status and shuts down the worker if the stored status indicates failure or timeout.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/mongojob/worker.rb', line 242 def tick worker = Model::Worker.find id # Shut down if there is no worker status stored # shutdown! unless worker # Shut down if worker status is different than 'ok' # shutdown! unless worker.status == 'ok' data = tick_data.merge({ pinged_at: Time.now, status: 'ok', queues: @queues }) Model::Worker.tick id, data end |
#tick_data ⇒ Object
Prepares data to be send alongside with the tick.
260 261 262 263 264 265 266 |
# File 'lib/mongojob/worker.rb', line 260 def tick_data { hostname: hostname, ip: real_ip, custom_status: custom_status } end |
#work_job ⇒ Object
Contains the working cycle:
-
Maintanance stuff
-
Get a job
-
Run a job
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/mongojob/worker.rb', line 89 def work_job # MAINTENANCE # Are we shutting down? if @shutdown Kernel.exit!(0) if @current_jobs.size == 0 end # PROCESSING JOBS # Get a job job = get_new_job return unless job log.info "Got a new job #{job.id}" if job.job_class.fork? # Job that requires a fork, perfect for long-running stuff. log.debug "Forking the process for job #{job.id}" pid = fork do process_job job end @job_pids[job.id] = pid # TODO: We need to store which PID corresponds to this job elsif job.job_class.fiber? # A job that requires a separate fiber. log.debug "Creating a new fiber for job #{job.id}" Fiber.new do process_job job finish_job job end.resume else # Old-school, blocking job log.debug "Running job #{job.id} in the blocking mode" process_job job finish_job job end end |