Module: Bj::Runner::Instance_Methods

Defined in:
lib/bj/runner.rb

Instance Method Summary collapse

Instance Method Details

#archive_jobs(options = {}) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/bj/runner.rb', line 266

def archive_jobs options = {}
  Bj.transaction(options) do
    now = Time.now
    too_old = now - Bj.ttl
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["(state = 'finished' or state = 'dead') and submitted_at < ?", too_old]
    jobs.each do |job|
      Bj.logger.info{ "#{ job.title } - archived" }
      hash = job.to_hash.update(:archived_at => now)
      Bj::Table::JobArchive.create! hash 
      job.destroy
    end
  end
end

#fill_morgue(options = {}) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/bj/runner.rb', line 244

def fill_morgue options = {}
  Bj.transaction(options) do
    now = Time.now
    jobs = Bj::Table::Job.find :all,
                               :conditions => ["state = 'running' and runner = ?", Bj.hostname]
    jobs.each do |job|
      if job.is_restartable?
        Bj.logger.info{ "#{ job.title } - found dead and bloated but resubmitted" }
        %w[ runner pid started_at finished_at stdout stderr exit_status ].each do |column|
          job[column] = nil
        end
        job.state = 'pending'
      else
        Bj.logger.info{ "#{ job.title } - found dead and bloated" }
        job.state = 'dead'
        job.finished_at = now
      end
      job.save!
    end
  end
end

#initialize(options = {}, &block) ⇒ Object



123
124
125
126
# File 'lib/bj/runner.rb', line 123

def initialize options = {}, &block
  options.to_options!
  @options, @block = options, block
end

#install_signal_handlersObject



230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/bj/runner.rb', line 230

def install_signal_handlers
  trap(Runner.signal) do
    begin
      Runner.signaled!
    rescue Exception => e
      Bj.logger.error{ e } 
    end
  end

  trap("INT"){ exit } unless Runner.signal == "INT"
rescue Exception
  42
end

#ping_parentObject



218
219
220
221
222
223
224
225
226
227
228
# File 'lib/bj/runner.rb', line 218

def ping_parent
  ppid = options[:ppid]
  return unless ppid 
  begin
    Process.kill 0, Integer(ppid)
  rescue Errno::ESRCH
    Kernel.exit 42
  rescue Exception
    42
  end
end

#register(options = {}) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
# File 'lib/bj/runner.rb', line 281

def register options = {}
  Bj.transaction(options) do
    pid = Bj.config[Runner.key]
    return false if Util.alive?(pid)
    Bj.config[Runner.key] = Process.pid
  end
  at_exit{ unregister }
  true
rescue Exception
  false
end

#runObject



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/bj/runner.rb', line 128

def run
  wait = options[:wait] || 42
  limit = options[:limit]
  forever = options[:forever]

  limit = false if forever
  wait = Integer wait
  loopno = 0

  Runner.thread = Thread.current
  Bj.chroot

  register or exit!(EXIT::WARNING)

  Bj.logger.info{ "STARTED" }
  at_exit{ Bj.logger.info{ "STOPPED" } }

  fill_morgue
  install_signal_handlers

  loop do
    ping_parent

    loopno += 1
    break if(limit and loopno > limit)

    archive_jobs

    catch :no_jobs do
      loop do
        job = thread = stdout = stderr = nil

        Bj.transaction(options) do
          now = Time.now

          job = Bj::Table::Job.find :first,
                                    :conditions => ["state = ? and submitted_at <= ?", "pending", now],
                                    :order => "priority DESC, submitted_at ASC", 
                                    :limit => 1,
                                    :lock => true
          throw :no_jobs unless job


          Bj.logger.info{ "#{ job.title } - started" }

          command = job.command
          env = job.env || {}
          stdin = job.stdin || ''
          stdout = job.stdout || ''
          stderr = job.stderr || ''
          started_at = Time.now

          thread = Util.start command, :cwd=>Bj.rails_root, :env=>env, :stdin=>stdin, :stdout=>stdout, :stderr=>stderr

          job.state = "running"
          job.runner = Bj.hostname
          job.pid = thread.pid
          job.started_at = started_at 
          job.save!
          job.reload
        end

        exit_status = thread.value
        finished_at = Time.now

        Bj.transaction(options) do
          job = Bj::Table::Job.find job.id 
          break unless job
          job.state = "finished"
          job.finished_at = finished_at 
          job.stdout = stdout
          job.stderr = stderr
          job.exit_status = exit_status
          job.save!
          job.reload
          Bj.logger.info{ "#{ job.title } - exit_status=#{ job.exit_status }" }
        end
      end
    end

    Runner.signaled false
    wait.times do
      sleep 1
      break if Runner.signaled?
    end 

    break unless(limit or limit == false)
  end
end

#unregister(options = {}) ⇒ Object



293
294
295
296
297
298
299
300
# File 'lib/bj/runner.rb', line 293

def unregister options = {}
  Bj.transaction(options) do
    Bj.config.delete Runner.key
  end
  true
rescue Exception
  false
end