Module: Bj::Runner::Instance_Methods

Defined in:
lib/bj/runner.rb

Instance Method Summary collapse

Instance Method Details

#archive_jobsObject



284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/bj/runner.rb', line 284

def archive_jobs
  Bj.transaction do
    now = Time.now
    too_old = now - Bj.ttl
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old]
    jobs.each do |job|
      Bj.logger.info{ "#{ job.title } - archived" }
      hash = job.to_hash.update(:archived_at => now)
      Bj::Table::JobArchive.create! hash 
      job.destroy
    end
  end
end

#fill_morgueObject



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/bj/runner.rb', line 262

def fill_morgue
  Bj.transaction do
    now = Time.now
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["state = 'running' and runner = ?", Bj.hostname]
    jobs.each do |job|
      if job.is_restartable?
        Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" }
        %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column|
          job[column] = nil
        end
        job.state = 'pending'
      else
        Bj.logger.info{ "#{ job.title } - found dead and bloated" }
        job.state = 'dead'
        job.finished_at = now
      end
      job.save!
    end
  end
end

#initialize(options = {}, &block) ⇒ Object



151
152
153
154
# File 'lib/bj/runner.rb', line 151

def initialize options = {}, &block
  options.to_options!
  @options, @block = options, block
end

#install_signal_handlersObject



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/bj/runner.rb', line 231

def install_signal_handlers
  Runner.hup_signaled false
  hup_handler = nil
  hup_handler =
    trap Runner.hup_signal do |*a|
      begin
        Runner.hup_signaled true
      rescue Exception => e
        Bj.logger.error{ e } rescue nil
      end
      hup_handler.call *a rescue nil
    end

  Runner.kill_signaled false
  kill_handler = nil
  kill_handler =
    trap Runner.kill_signal do |*a|
      begin
        Runner.kill_signaled true
      rescue Exception => e
        Bj.logger.error{ e } rescue nil
      end
      kill_handler.call *a rescue nil
    end

  begin
    trap("INT"){ exit }
  rescue Exception
  end
end

#keyObject



325
326
327
# File 'lib/bj/runner.rb', line 325

def key
  @key ||= ( options[:ppid] ? Runner.key(options[:ppid]) : Runner.key )
end

#ping_parentObject



219
220
221
222
223
224
225
226
227
228
229
# File 'lib/bj/runner.rb', line 219

def ping_parent
  ppid = options[:ppid]
  return unless ppid
  begin
    Process.kill 0, Integer(ppid)
  rescue Errno::ESRCH
    Kernel.exit 42
  rescue Exception
    42
  end
end

#registerObject



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/bj/runner.rb', line 299

def register
  Bj.transaction do
    pid = Bj.config[key]
    return false if Util.alive?(pid)
    Bj.config[key] = Process.pid
    unless Bj.util.ipc_signals_supported? # not winblows
      require "drb"
      DRb.start_service "druby://localhost:0", Process
      Bj.config["#{ Process.pid }.uri"] = DRb.uri 
    end
  end
  at_exit{ unregister }
  true
rescue Exception
  false
end

#runObject



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/bj/runner.rb', line 156

def run
  wait = options[:wait] || 42
  limit = options[:limit]
  forever = options[:forever]

  limit = false if forever
  wait = Integer wait
  loopno = 0

  Runner.thread = Thread.current
  Bj.chroot

  register or exit!(EXIT::WARNING)

  Bj.logger.info{ "STARTED" }
  at_exit{ Bj.logger.info{ "STOPPED" } }

  fill_morgue
  install_signal_handlers

  loop do
    ping_parent

    loopno += 1
    break if(limit and loopno > limit)

    archive_jobs

    catch :no_jobs do
      loop do
        job = thread = stdout = stderr = nil

        Bj.transaction(options) do
          now = Time.now

          job = Bj::Table::Job.find :first,
                                    :conditions => ["state = ? and submitted_at <= ?", "pending", now],
                                    :order => "priority DESC, submitted_at ASC", 
                                    :limit => 1,
                                    :lock => true
          throw :no_jobs unless job

          Bj.logger.info{ "#{ job.title } - starting..." }
        end

        job.run!

        Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
      end
    end

    Runner.hup_signaled false
    wait.times do
      break if Runner.hup_signaled?
      break if Runner.kill_signaled?
      sleep 1
    end

    break unless(limit or limit == false)
    break if Runner.kill_signaled?
  end
end

#unregisterObject



316
317
318
319
320
321
322
323
# File 'lib/bj/runner.rb', line 316

def unregister
  Bj.transaction do
    Bj.config.delete key
  end
  true
rescue Exception
  false
end