Class: Beanpicker::Worker::Child

Inherits:
Object
  • Object
show all
Includes:
MsgLogger
Defined in:
lib/beanpicker/job_server.rb

Overview

Child is the class that handle the job. Every job can be one or more childs

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from MsgLogger

#debug, #error, #fatal, #info, #log_handler=, #msg, #warn

Constructor Details

#initialize(job, opts = {}, number = 0, worker = nil, &blk) ⇒ Child

Create a new job, start a fork if @opts and start the work



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/beanpicker/job_server.rb', line 157

def initialize(job, opts={}, number=0, worker=nil, &blk)
  @job_name    = job
  @opts        = {
    :childs      => Beanpicker::default_childs_number,
    :fork_every  => Beanpicker::default_fork_every,
    :fork_master => Beanpicker::default_fork_master
  }.merge(opts)
  @number      = number
  @blk         = blk
  @loop        = nil
  @beanstalk   = Beanpicker::new_beanstalk
  @run         = true
  @job         = nil
  @worker      = worker
  if @opts[:fork]
    _fork_every  = @opts[:fork].to_s == 'every'
    _fork_master = @opts[:fork].to_s == 'master'
  else
    _fork_every  = !!@opts[:fork_every]
    _fork_master = !!@opts[:fork_master]
  end
  #really need self
  self.log_handler = @opts[:log_file] unless @opts[:log_file].nil?
  @fork_every  = Beanpicker::fork_every.nil?  ? _fork_every  : Beanpicker::fork_every
  @fork_master = Beanpicker::fork_master.nil? ? _fork_master : Beanpicker::fork_master
  @fork_master_pid = nil
  @fork_every_pid  = nil
  start_watch
  start_loop
end

Instance Attribute Details

#fork_everyObject (readonly)

Should fork every job?



144
145
146
# File 'lib/beanpicker/job_server.rb', line 144

def fork_every
  @fork_every
end

#fork_every_pidObject (readonly)

The pid of running job



148
149
150
# File 'lib/beanpicker/job_server.rb', line 148

def fork_every_pid
  @fork_every_pid
end

#fork_masterObject (readonly)

Should fork the child process?



146
147
148
# File 'lib/beanpicker/job_server.rb', line 146

def fork_master
  @fork_master
end

#fork_master_pidObject (readonly)

The pid of forked child process



150
151
152
# File 'lib/beanpicker/job_server.rb', line 150

def fork_master_pid
  @fork_master_pid
end

#job_nameObject (readonly)

The name of job



140
141
142
# File 'lib/beanpicker/job_server.rb', line 140

def job_name
  @job_name
end

#numberObject (readonly)

The number of job(generated by Child::process)



142
143
144
# File 'lib/beanpicker/job_server.rb', line 142

def number
  @number
end

#optsObject (readonly)

The default options merged with argument options



152
153
154
# File 'lib/beanpicker/job_server.rb', line 152

def opts
  @opts
end

#workerObject (readonly)

The Worker (father?) or nil



154
155
156
# File 'lib/beanpicker/job_server.rb', line 154

def worker
  @worker
end

Class Method Details

.process(job, opts = {}, worker = nil, &blk) ⇒ Object

Process job. Use opts or Beanpicker::default_childs_number to determine how many childs should be created



132
133
134
135
136
# File 'lib/beanpicker/job_server.rb', line 132

def self.process(job, opts={}, worker=nil, &blk)
  (opts[:childs] || Beanpicker::default_childs_number).times.map do |i|
    Child.new(job, opts, i, worker, &blk)
  end
end

Instance Method Details

#at_exit_to_every_child_forkObject

Create a at_exit to bury the job(if any) and exit

Used by fork



341
342
343
344
345
346
347
348
349
# File 'lib/beanpicker/job_server.rb', line 341

def at_exit_to_every_child_fork
  at_exit do
    Thread.new do
      sleep 1
      Kernel.exit!
    end
    BEANPICKER_FORK[:job].bury rescue nil if BEANPICKER_FORK[:job]
  end
end

#at_exit_to_master_child_forkObject

Create a at_exit to kill the child(if any)

Used by fork_master_child_and_monitor



321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/beanpicker/job_server.rb', line 321

def at_exit_to_master_child_fork
  at_exit do
    pid = BEANPICKER_FORK[:child_pid]
    if pid and pid > 0
      if Process.running?(pid)
        Process.kill "TERM", pid
        sleep 0.1
        if Process.running?(pid)
          sleep 2
          Process.kill "KILL", pid if Process.running?(pid)
        end
      end
    end
    Kernel.exit!
  end
end

#beanstalkObject

Return the beanstalk connection(Child don’t use the global connection)



189
190
191
# File 'lib/beanpicker/job_server.rb', line 189

def beanstalk
  @beanstalk
end

#die!Object

Stop running, kill the thread and kill master os every process



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/beanpicker/job_server.rb', line 283

def die!
  @run = false
  @loop.kill if @loop and @loop.alive?

  kill_pid = nil
  if @fork_master
    kill_pid = @fork_master_pid
  elsif @fork_every
    kill_pid = @fork_every_pid
  end

  if kill_pid and kill_pid.is_a?(Integer) and Process.running?(kill_pid)
    debug "Killing child with pid #{kill_pid}"
    Process.kill "TERM", kill_pid
  end

end

#fork(&blk) ⇒ Object

Fork the process if @fork_every and wait or only call the block



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/beanpicker/job_server.rb', line 255

def fork(&blk)
  if @fork_every
    @fork_every_pid = pid = Kernel.fork do
      BEANPICKER_FORK[:child_every] = true
      Process.die_with_parent
      at_exit_to_every_child_fork
      $0 = "Beanpicker job child #{@job_name}##{@number} of #{Process.ppid}"
      blk.call
    end
    if BEANPICKER_FORK[:child_master]
      BEANPICKER_FORK[:child_pid] = pid
      Process.waitpid pid
      BEANPICKER_FORK[:child_pid] = nil
    else
      Process.waitpid pid
    end
    @fork_every_pid = nil
  else
    blk.call
  end
end

#fork_master_child_and_monitorObject

Crete a new fork, change the name and create a thread to restart the fork if it die

Called by start_loop when fork_master



304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/beanpicker/job_server.rb', line 304

def fork_master_child_and_monitor
  @fork_master_pid = Kernel.fork do
    at_exit_to_master_child_fork
    Process.die_with_parent
    BEANPICKER_FORK[:child_master] = true
    $0 = "Beanpicker master child #{@job_name}##{@number}"
    work_loop(self)
  end
  @loop = Thread.new(self) do |child|
    Process.waitpid @fork_master_pid
    child.fork_master_child_and_monitor if child.running?
  end
end

#log_handlerObject

Return the own log_handler, or the Worker log_handler or the global log_handler



352
353
354
355
# File 'lib/beanpicker/job_server.rb', line 352

def log_handler
  #'@log_handler || ' go to worker/global log_handler even if @log_handler is defined
  defined?(@log_handler) ? @log_handler : @worker.nil? ? Beanpicker::log_handler : @worker.log_handler
end

#running?Boolean

The child still working?

Returns:

  • (Boolean)


278
279
280
# File 'lib/beanpicker/job_server.rb', line 278

def running?
  @run
end

#start_loopObject

Start the loop, fork if needed



202
203
204
205
206
207
208
209
210
211
# File 'lib/beanpicker/job_server.rb', line 202

def start_loop
  return false if @loop and @loop.alive?
  if @fork_master
    fork_master_child_and_monitor
  else
    @loop = Thread.new(self) do |child|
      work_loop(child)
    end
  end
end

#start_watchObject

Watch the tube with job name and ignore all others



194
195
196
197
198
199
# File 'lib/beanpicker/job_server.rb', line 194

def start_watch
  beanstalk.watch(@job_name)
  beanstalk.list_tubes_watched.each do |server, tubes|
    tubes.each { |tube| beanstalk.ignore(tube) unless tube == @job_name }
  end
end

#start_work(child = self) ⇒ Object

Here is all the magic :)

Call fork and start the job and delete on end, bury the job if a error is raised



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/beanpicker/job_server.rb', line 221

def start_work(child=self)
  fork do
    begin
      @job = child.beanstalk.reserve
      BEANPICKER_FORK[:job] = @job if BEANPICKER_FORK[:child_every]
      data  = @job.ybody

      if not data.is_a?(Hash) or [:args, :next_jobs] - data.keys != []
        data = { :args => data, :next_jobs => [] }
      end

      t=Time.now
      debug "Running #{@job_name}##{@number} with args #{data[:args]}; next jobs #{data[:next_jobs]}"
      r = @blk.call(data[:args].clone)
      debug "Job #{@job_name}##{@number} finished in #{Time.now-t} seconds with return #{r}"
      data[:args].merge!(r) if r.is_a?(Hash) and data[:args].is_a?(Hash)

      @job.delete

      Beanpicker.enqueue(data[:next_jobs], data[:args]) if r and not data[:next_jobs].empty?
    rescue => e
      fatal Beanpicker::exception_message(e, "in loop of #{@job_name}##{@number} with pid #{Process.pid}")
      if BEANPICKER_FORK[:child_every]
        exit
      else
        Thread.new(@job) { |j| j.bury rescue nil }
      end
    ensure
      @job = nil
    end
  end
end

#work_loop(child) ⇒ Object

call start_work passing itself(child) while @run



214
215
216
# File 'lib/beanpicker/job_server.rb', line 214

def work_loop(child)
  start_work(child) while @run
end