Class: JobServer

Inherits:
Object
  • Object
show all
Defined in:
lib/jobserver.rb

Overview

Usage example:

Create job handlers

require 'jobserver'
pre_run_handler = Proc.new do |job|
  puts "Running job #{job.name} on #{job.host}"
  # Initialize the results object as an empty array:
  job.results = []
end
output_handler = Proc.new do |file, job|
  line = file.gets
  job.results << $1 if line =~ /result: (.*)/
end
post_run_handler = Proc.new do |job|
  if job.results.empty?
    puts "Error executing job #{job.name} on #{job.host}.\n\t#{job}"
  else
    puts $result = job.results.join(",")
  end
end

Create the jobs

Job.default_client_command = "runclient"
myJobQueue = []
10.times{|i| myJobQueue << Job.new(:name=>"job#{i}", :params=>"#{i}", :pre_run_handler => pre_run_handler, 
                                   :output_handler=>output_handler, :post_run_handler=>post_run_handler)}

Create the server

server = JobServer.new(myJobQueue, "~/work") #run 1 local worker implicitly
server.add_ssh_worker("192.168.0.1", "~/work_sparc")
server.add_ssh_worker("192.168.0.2", "~/work", 2)
server.dumpStatistics
server.serve # Wait until all jobs have finished

Defined Under Namespace

Classes: Deadlock

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobQueue, local_working_directory = "", numLocalWorkers = 1, terminateWorkersWhenJobQueueEmpty = true) ⇒ JobServer

Instantiates a new JobServer object and creates the given number of local clients, called workers.

jobQueue

is an array of jobs of type Job. Jobs on the left side of the array are processed first. Jobs that had errors during execution will be enqueued at the end of the queue. If you want to decide whether to re-enqueue a job that had errors you can replace the method #retryJob by your own.

local_working_directory

is the directory in which local clients are launched

numLocalWorkers

is the number of client workers which run on the server itself (e.g. number of CPUs)

terminateWorkersWhenJobQueueEmpty

false means, the workers continue to run, even if the queue is empty until #close was called. This can be used if you want to add jobs while others are running. The situation can occur that all remaining jobs are running and the queue is empty but you want to add more jobs.



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/jobserver.rb', line 389

def initialize(jobQueue, local_working_directory = "", numLocalWorkers = 1, terminateWorkersWhenJobQueueEmpty = true)
  @jobQueue = jobQueue
  @jobQueue.extend(MonitorMixin)
  @initialQueueLength = @jobQueue.length
  @jobsRunning = []
  @jobsRunning.extend(MonitorMixin)
  @noJobs_cond = @jobQueue.new_cond
  @local_working_directory = local_working_directory
  @workers = []
  @hostStats = Hash.new
  @hostStats.extend(MonitorMixin)
  @terminateWorkersWhenJobQueueEmpty = terminateWorkersWhenJobQueueEmpty
  @usedHosts = []
  add_local_worker(numLocalWorkers)
end

Instance Attribute Details

#hostStatsObject (readonly)

hostStats[hostname] returns the HostStatistics object for the string hostname



370
371
372
# File 'lib/jobserver.rb', line 370

def hostStats
  @hostStats
end

#workersObject (readonly)

an array of the worker threads currently running



368
369
370
# File 'lib/jobserver.rb', line 368

def workers
  @workers
end

Instance Method Details

#add_job(job) ⇒ Object Also known as: <<

appends a job at the end of the queue and informs the sleeping worker threads that new jobs are available.



468
469
470
471
# File 'lib/jobserver.rb', line 468

def add_job(job)
  @jobQueue << job
  @noJobs_cond.broadcast #wake up workers that wait for new jobs
end

#add_local_worker(numWorkers = 1) ⇒ Object

Adds a worker thread that processes jobs on the local machine. This method is called automatically on object instantiation. numWorkers indicates, how many workers should use this client. It should usually not exceed the number of CPUs that are available for the host. You may wish to set it to zero, if the server machine should not be used as a client as well.



480
481
482
# File 'lib/jobserver.rb', line 480

def add_local_worker(numWorkers = 1)
  add_worker("localhost", @local_working_directory, numWorkers)
end

#add_ssh_worker(hostname, working_directory = "", numWorkers = 1) ⇒ Object

Adds a worker thread that processes jobs on a given remote host machine. hostname can contain a username like: [email protected] If working_directory is not empty then the client command will be executed in the given directory. numWorkers indicates, how many workers should use this client. It should usually not exceed the number of CPUs that are available for the host.



490
491
492
493
# File 'lib/jobserver.rb', line 490

def add_ssh_worker(hostname, working_directory = "", numWorkers = 1)
  hostname = "localhost" if ENV['HOSTNAME'] == hostname
  add_worker(hostname, working_directory, numWorkers)
end

#closeObject

close must be called only when you have set terminateWorkersWhenJobQueueEmpty to false during instantiation. Then you tell the server that no further jobs will be added to the job queue. All worker threads that have been waiting for new jobs will terminate now. You should still wait for all workers to complete. Use serve to do so.



426
427
428
429
# File 'lib/jobserver.rb', line 426

def close
  @terminateWorkersWhenJobQueueEmpty = true
  @noJobs_cond.signal
end

#dumpStatistics(filename = "jobserver_stats.txt", timeInSec = 60) ⇒ Object

Writes statistics for each host to the given file. If no directory is given then in the local working directory given to new is used. timeInSec is the time after which the current statistics will be written. Time set to 0 means, write only when all jobs have finished. When the jobserver terminates, the last state will be written however the time.



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
# File 'lib/jobserver.rb', line 437

def dumpStatistics(filename = "jobserver_stats.txt", timeInSec=60)
  filename = File.join(@local_working_directory, filename) if filename == File.basename(filename)
  
  @dumpStatThread = Thread.new(timeInSec) do |sleepTime|
    loop do
      sleep(sleepTime)
      File.open(filename,"w") do |file|
        file.puts "Host statistics:\n================"
        @hostStats.synchronize do
          @hostStats.to_a.sort{|x,y| x[0]<=>y[0]}.each{|host,stats| file.puts "#{host}: #{stats}"}
        end
        unless @jobsRunning.empty? 
          @jobsRunning.synchronize do
            file.puts "\nJobs running:\n============"
            file.puts @jobsRunning.map {|job| "#{job.host}: #{job}" },""
          end
        end
        unless @jobQueue.empty?
          @jobQueue.synchronize do 
            file.puts s="Jobs in the queue: (#{@jobQueue.length}/#@initialQueueLength remaining)"
            file.puts "="*s.length                       
            file.puts @jobQueue
          end
        end
      end
    end 
  end
end

#getNextJob(hostname) ⇒ Object

Removes and returns a Job from the queue using a FIFO strategy but considering the dependency constraints of the jobs. That means, only those jobs are chosen that have only jobs in their dependency list that are finished. The user has to guarantee that the dependency-graph has no cycles. Furthermore each job has a function :runsOnHost that you might want to override in order to decide whether the job is fit to run on the given host. It may happen that no host gets a job. This may result in a job that failed but that is cruical to the execution of others so that these cannot be executed any more. This deadlock situation raises an exception.

When a job failed on a host, it tries to rerun on other hosts first (if there are any).



527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/jobserver.rb', line 527

def getNextJob(hostname)
  job = @jobQueue.find do |job|
    # it is assumed that all jobs form the queue are WAITING
    ok = (job.dependencies.length == job.dependencies.select{|dep| dep.status == Job::SUCCESS}.length and
            job.runsOnHost(hostname))
    # ok == true => job could be run on 'hostname'
    # did the job fail on this host?
    if ok and job.failedOnHosts.include?(hostname)
      # if it failed already on some hosts but there are still machines it could run on and didn't fail on them yet,
      # so run on those hosts first. If otherwise the job failed on all hosts, try the current host again.
      ok = job.runsOnHosts(@usedHosts - job.failedOnHosts).empty?
    end
    ok
  end
  return @jobQueue.delete(job) # returns nil if job == nil
end

#retryJob(job) ⇒ Object

Is called when job couldn’t be executed. Determines whether to try to rerun the job. Whether a job had errors or not depends on whether the results object is false/nil or not. See #Job.new for further details about results. By default, retryJob allows three tries until the job is given up.

If you want another behaviour, override the method +retryJob+ by your own, e.g.:
 require 'jobserver'

 class JobServer
   def retryJob(job)
     puts "Job failed: #{job.name}"
     return false
   end
 end


508
509
510
511
512
513
# File 'lib/jobserver.rb', line 508

def retryJob(job)
  if job.numTries < 3
    puts "FAILURE: Will try to run job later: #{job}"
    true
  end # else return false implicitly
end

#serve(verbose = true) ⇒ Object

serve waits for all jobs to terminate and outputs statistics if verbose is true

Raises:

  • (Exception)


406
407
408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/jobserver.rb', line 406

def serve(verbose = true)
  raise(Exception, "No workers registered but serve was called. The jobs can't be processed!",caller) if @workers.empty?
  @workers.each{|worker| worker.wakeup}  #wake all workers up
  @workers.each{|worker| worker.join}  #wait for all workers to finish

  @dumpStatThread.wakeup if defined?(@dumpStatThread) and @dumpStatThread.status #wake up the statistics dumper

  #output statistics
  if verbose
    puts "Host statistics:\n================"
    @hostStats.to_a.sort{|x,y| x[0]<=>y[0]}.each{|host,stats| puts "#{host}: #{stats}"}
    puts
  end
end