Module: Bj::Runner::Instance_Methods

Defined in:
lib/bj/runner.rb

Instance Method Summary collapse

Instance Method Details

#archive_jobsObject



312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/bj/runner.rb', line 312

def archive_jobs
  Bj.transaction do
    now = Time.now
    too_old = now - Bj.ttl
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old]
    jobs.each do |job|
      Bj.logger.info{ "#{ job.title } - archived" }
      hash = job.to_hash.update(:archived_at => now)
      Bj::Table::JobArchive.create! hash 
      job.destroy
    end
  end
end

#fill_morgueObject



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/bj/runner.rb', line 290

def fill_morgue
  Bj.transaction do
    now = Time.now
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["state = 'running' and runner = ?", Bj.hostname]
    jobs.each do |job|
      if job.is_restartable?
        Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" }
        %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column|
          job[column] = nil
        end
        job.state = 'pending'
      else
        Bj.logger.info{ "#{ job.title } - found dead and bloated" }
        job.state = 'dead'
        job.finished_at = now
      end
      job.save!
    end
  end
end

#initialize(options = {}, &block) ⇒ Object



150
151
152
153
# File 'lib/bj/runner.rb', line 150

def initialize options = {}, &block
  options.to_options!
  @options, @block = options, block
end

#install_signal_handlersObject



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/bj/runner.rb', line 259

def install_signal_handlers
  Runner.hup_signaled false
  hup_handler = nil
  hup_handler =
    trap Runner.hup_signal do |*a|
      begin
        Runner.hup_signaled true
      rescue Exception => e
        Bj.logger.error{ e } rescue nil
      end
      hup_handler.call *a rescue nil
    end

  Runner.kill_signaled false
  kill_handler = nil
  kill_handler =
    trap Runner.kill_signal do |*a|
      begin
        Runner.kill_signaled true
      rescue Exception => e
        Bj.logger.error{ e } rescue nil
      end
      kill_handler.call *a rescue nil
    end

  begin
    trap("INT"){ exit }
  rescue Exception
  end
end

#keyObject



353
354
355
# File 'lib/bj/runner.rb', line 353

def key
  @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key )
end

#ping_parentObject



247
248
249
250
251
252
253
254
255
256
257
# File 'lib/bj/runner.rb', line 247

def ping_parent
  ppid = options[:ppid]
  return unless ppid 
  begin
    Process.kill 0, Integer(ppid)
  rescue Errno::ESRCH
    Kernel.exit 42
  rescue Exception
    42
  end
end

#registerObject



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/bj/runner.rb', line 327

def register
  Bj.transaction do
    pid = Bj.config[key]
    return false if Util.alive?(pid)
    Bj.config[key] = Process.pid
    unless Bj.util.ipc_signals_supported? # not winblows
      require "drb"
      DRb.start_service "druby://localhost:0", Process
      Bj.config["#{ Process.pid }.uri"] = DRb.uri 
    end
  end
  at_exit{ unregister }
  true
rescue Exception
  false
end

#runObject



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/bj/runner.rb', line 155

def run
  wait = options[:wait] || 42
  limit = options[:limit]
  forever = options[:forever]

  limit = false if forever
  wait = Integer wait
  loopno = 0

  Runner.thread = Thread.current
  Bj.chroot

  register or exit!(EXIT::WARNING)

  Bj.logger.info{ "STARTED" }
  at_exit{ Bj.logger.info{ "STOPPED" } }

  fill_morgue
  install_signal_handlers

  loop do
    ping_parent

    loopno += 1
    break if(limit and loopno > limit)

    archive_jobs

    catch :no_jobs do
      loop do
        job = thread = stdout = stderr = nil

        Bj.transaction(options) do
          now = Time.now

          job = Bj::Table::Job.find :first,
                                    :conditions => ["state = ? and submitted_at <= ?", "pending", now],
                                    :order => "priority DESC, submitted_at ASC", 
                                    :limit => 1,
                                    :lock => true
          throw :no_jobs unless job


          Bj.logger.info{ "#{ job.title } - started" }

          command = job.command
          env = job.env ? YAML.load(job.env) : {}
          stdin = job.stdin || ''
          stdout = job.stdout || ''
          stderr = job.stderr || ''
          started_at = Time.now

          thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr

          job.state = "running"
          job.runner = Bj.hostname
          job.pid = thread.pid
          job.started_at = started_at 
          job.save!
          job.reload
        end

        exit_status = thread.value
        finished_at = Time.now

        Bj.transaction(options) do
          job = Bj::Table::Job.find job.id 
          break unless job
          job.state = "finished"
          job.finished_at = finished_at 
          job.stdout = stdout
          job.stderr = stderr
          job.exit_status = exit_status
          job.save!
          job.reload
          Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
        end
      end
    end

    Runner.hup_signaled false
    wait.times do
      break if Runner.hup_signaled?
      break if Runner.kill_signaled?
      sleep 1
    end 

    break unless(limit or limit == false)
    break if Runner.kill_signaled?
  end
end

#unregisterObject



344
345
346
347
348
349
350
351
# File 'lib/bj/runner.rb', line 344

def unregister
  Bj.transaction do
    Bj.config.delete key
  end
  true
rescue Exception
  false
end