Class: Beanpicker::Worker::Child
- Inherits:
-
Object
- Object
- Beanpicker::Worker::Child
- Includes:
- MsgLogger
- Defined in:
- lib/beanpicker/job_server.rb
Overview
Child is the class that handle the job. Every job can be one or more childs
Instance Attribute Summary collapse
-
#fork_every ⇒ Object
readonly
Should fork every job?.
-
#fork_every_pid ⇒ Object
readonly
The pid of running job.
-
#fork_master ⇒ Object
readonly
Should fork the child process?.
-
#fork_master_pid ⇒ Object
readonly
The pid of forked child process.
-
#job_name ⇒ Object
readonly
The name of job.
-
#number ⇒ Object
readonly
The number of job(generated by Child::process).
-
#opts ⇒ Object
readonly
The default options merged with argument options.
-
#worker ⇒ Object
readonly
The Worker (father?) or nil.
Class Method Summary collapse
Instance Method Summary collapse
-
#at_exit_to_every_child_fork ⇒ Object
Create a at_exit to bury the job(if any) and exit.
-
#at_exit_to_master_child_fork ⇒ Object
Create a at_exit to kill the child(if any).
-
#beanstalk ⇒ Object
Return the beanstalk connection(Child don’t use the global connection).
-
#die! ⇒ Object
Stop running, kill the thread and kill master os every process.
-
#fork(&blk) ⇒ Object
Fork the process if @fork_every and wait or only call the block.
-
#fork_master_child_and_monitor ⇒ Object
Crete a new fork, change the name and create a thread to restart the fork if it die.
-
#initialize(job, opts = {}, number = 0, worker = nil, &blk) ⇒ Child
constructor
Create a new job, start a fork if @opts and start the work.
-
#log_handler ⇒ Object
Return the own log_handler, or the Worker log_handler or the global log_handler.
-
#running? ⇒ Boolean
The child still working?.
-
#start_loop ⇒ Object
Start the loop, fork if needed.
-
#start_watch ⇒ Object
Watch the tube with job name and ignore all others.
-
#start_work(child = self) ⇒ Object
Here is all the magic :).
-
#work_loop(child) ⇒ Object
call start_work passing itself(child) while @run.
Methods included from MsgLogger
#debug, #error, #fatal, #info, #log_handler=, #msg, #warn
Constructor Details
#initialize(job, opts = {}, number = 0, worker = nil, &blk) ⇒ Child
Create a new job, start a fork if @opts and start the work
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/beanpicker/job_server.rb', line 157 def initialize(job, opts={}, number=0, worker=nil, &blk) @job_name = job @opts = { :childs => Beanpicker::default_childs_number, :fork_every => Beanpicker::default_fork_every, :fork_master => Beanpicker::default_fork_master }.merge(opts) @number = number @blk = blk @loop = nil @beanstalk = Beanpicker::new_beanstalk @run = true @job = nil @worker = worker if @opts[:fork] _fork_every = @opts[:fork].to_s == 'every' _fork_master = @opts[:fork].to_s == 'master' else _fork_every = !!@opts[:fork_every] _fork_master = !!@opts[:fork_master] end #really need self self.log_handler = @opts[:log_file] unless @opts[:log_file].nil? @fork_every = Beanpicker::fork_every.nil? ? _fork_every : Beanpicker::fork_every @fork_master = Beanpicker::fork_master.nil? ? _fork_master : Beanpicker::fork_master @fork_master_pid = nil @fork_every_pid = nil start_watch start_loop end |
Instance Attribute Details
#fork_every ⇒ Object (readonly)
Should fork every job?
144 145 146 |
# File 'lib/beanpicker/job_server.rb', line 144 def fork_every @fork_every end |
#fork_every_pid ⇒ Object (readonly)
The pid of running job
148 149 150 |
# File 'lib/beanpicker/job_server.rb', line 148 def fork_every_pid @fork_every_pid end |
#fork_master ⇒ Object (readonly)
Should fork the child process?
146 147 148 |
# File 'lib/beanpicker/job_server.rb', line 146 def fork_master @fork_master end |
#fork_master_pid ⇒ Object (readonly)
The pid of forked child process
150 151 152 |
# File 'lib/beanpicker/job_server.rb', line 150 def fork_master_pid @fork_master_pid end |
#job_name ⇒ Object (readonly)
The name of job
140 141 142 |
# File 'lib/beanpicker/job_server.rb', line 140 def job_name @job_name end |
#number ⇒ Object (readonly)
The number of job(generated by Child::process)
142 143 144 |
# File 'lib/beanpicker/job_server.rb', line 142 def number @number end |
#opts ⇒ Object (readonly)
The default options merged with argument options
152 153 154 |
# File 'lib/beanpicker/job_server.rb', line 152 def opts @opts end |
#worker ⇒ Object (readonly)
The Worker (father?) or nil
154 155 156 |
# File 'lib/beanpicker/job_server.rb', line 154 def worker @worker end |
Class Method Details
.process(job, opts = {}, worker = nil, &blk) ⇒ Object
Process job. Use opts or Beanpicker::default_childs_number to determine how many childs should be created
132 133 134 135 136 |
# File 'lib/beanpicker/job_server.rb', line 132 def self.process(job, opts={}, worker=nil, &blk) (opts[:childs] || Beanpicker::default_childs_number).times.map do |i| Child.new(job, opts, i, worker, &blk) end end |
Instance Method Details
#at_exit_to_every_child_fork ⇒ Object
Create a at_exit to bury the job(if any) and exit
Used by fork
341 342 343 344 345 346 347 348 349 |
# File 'lib/beanpicker/job_server.rb', line 341 def at_exit_to_every_child_fork at_exit do Thread.new do sleep 1 Kernel.exit! end BEANPICKER_FORK[:job].bury rescue nil if BEANPICKER_FORK[:job] end end |
#at_exit_to_master_child_fork ⇒ Object
Create a at_exit to kill the child(if any)
Used by fork_master_child_and_monitor
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/beanpicker/job_server.rb', line 321 def at_exit_to_master_child_fork at_exit do pid = BEANPICKER_FORK[:child_pid] if pid and pid > 0 if Process.running?(pid) Process.kill "TERM", pid sleep 0.1 if Process.running?(pid) sleep 2 Process.kill "KILL", pid if Process.running?(pid) end end end Kernel.exit! end end |
#beanstalk ⇒ Object
Return the beanstalk connection(Child don’t use the global connection)
189 190 191 |
# File 'lib/beanpicker/job_server.rb', line 189 def beanstalk @beanstalk end |
#die! ⇒ Object
Stop running, kill the thread and kill master os every process
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/beanpicker/job_server.rb', line 283 def die! @run = false @loop.kill if @loop and @loop.alive? kill_pid = nil if @fork_master kill_pid = @fork_master_pid elsif @fork_every kill_pid = @fork_every_pid end if kill_pid and kill_pid.is_a?(Integer) and Process.running?(kill_pid) debug "Killing child with pid #{kill_pid}" Process.kill "TERM", kill_pid end end |
#fork(&blk) ⇒ Object
Fork the process if @fork_every and wait or only call the block
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/beanpicker/job_server.rb', line 255 def fork(&blk) if @fork_every @fork_every_pid = pid = Kernel.fork do BEANPICKER_FORK[:child_every] = true Process.die_with_parent at_exit_to_every_child_fork $0 = "Beanpicker job child #{@job_name}##{@number} of #{Process.ppid}" blk.call end if BEANPICKER_FORK[:child_master] BEANPICKER_FORK[:child_pid] = pid Process.waitpid pid BEANPICKER_FORK[:child_pid] = nil else Process.waitpid pid end @fork_every_pid = nil else blk.call end end |
#fork_master_child_and_monitor ⇒ Object
Crete a new fork, change the name and create a thread to restart the fork if it die
Called by start_loop when fork_master
304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/beanpicker/job_server.rb', line 304 def fork_master_child_and_monitor @fork_master_pid = Kernel.fork do at_exit_to_master_child_fork Process.die_with_parent BEANPICKER_FORK[:child_master] = true $0 = "Beanpicker master child #{@job_name}##{@number}" work_loop(self) end @loop = Thread.new(self) do |child| Process.waitpid @fork_master_pid child.fork_master_child_and_monitor if child.running? end end |
#log_handler ⇒ Object
Return the own log_handler, or the Worker log_handler or the global log_handler
352 353 354 355 |
# File 'lib/beanpicker/job_server.rb', line 352 def log_handler #'@log_handler || ' go to worker/global log_handler even if @log_handler is defined defined?(@log_handler) ? @log_handler : @worker.nil? ? Beanpicker::log_handler : @worker.log_handler end |
#running? ⇒ Boolean
The child still working?
278 279 280 |
# File 'lib/beanpicker/job_server.rb', line 278 def running? @run end |
#start_loop ⇒ Object
Start the loop, fork if needed
202 203 204 205 206 207 208 209 210 211 |
# File 'lib/beanpicker/job_server.rb', line 202 def start_loop return false if @loop and @loop.alive? if @fork_master fork_master_child_and_monitor else @loop = Thread.new(self) do |child| work_loop(child) end end end |
#start_watch ⇒ Object
Watch the tube with job name and ignore all others
194 195 196 197 198 199 |
# File 'lib/beanpicker/job_server.rb', line 194 def start_watch beanstalk.watch(@job_name) beanstalk.list_tubes_watched.each do |server, tubes| tubes.each { |tube| beanstalk.ignore(tube) unless tube == @job_name } end end |
#start_work(child = self) ⇒ Object
Here is all the magic :)
Call fork and start the job and delete on end, bury the job if a error is raised
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/beanpicker/job_server.rb', line 221 def start_work(child=self) fork do begin @job = child.beanstalk.reserve BEANPICKER_FORK[:job] = @job if BEANPICKER_FORK[:child_every] data = @job.ybody if not data.is_a?(Hash) or [:args, :next_jobs] - data.keys != [] data = { :args => data, :next_jobs => [] } end t=Time.now debug "Running #{@job_name}##{@number} with args #{data[:args]}; next jobs #{data[:next_jobs]}" r = @blk.call(data[:args].clone) debug "Job #{@job_name}##{@number} finished in #{Time.now-t} seconds with return #{r}" data[:args].merge!(r) if r.is_a?(Hash) and data[:args].is_a?(Hash) @job.delete Beanpicker.enqueue(data[:next_jobs], data[:args]) if r and not data[:next_jobs].empty? rescue => e fatal Beanpicker::(e, "in loop of #{@job_name}##{@number} with pid #{Process.pid}") if BEANPICKER_FORK[:child_every] exit else Thread.new(@job) { |j| j.bury rescue nil } end ensure @job = nil end end end |
#work_loop(child) ⇒ Object
call start_work passing itself(child) while @run
214 215 216 |
# File 'lib/beanpicker/job_server.rb', line 214 def work_loop(child) start_work(child) while @run end |