Class: JobServer
- Inherits:
-
Object
- Object
- JobServer
- Defined in:
- lib/jobserver.rb
Overview
Usage example:
Create job handlers
require 'jobserver'
pre_run_handler = Proc.new do |job|
puts "Running job #{job.name} on #{job.host}"
# Initialize the results object as an empty array:
job.results = []
end
output_handler = Proc.new do |file, job|
line = file.gets
job.results << $1 if line =~ /result: (.*)/
end
post_run_handler = Proc.new do |job|
if job.results.empty?
puts "Error executing job #{job.name} on #{job.host}.\n\t#{job}"
else
puts $result = job.results.join(",")
end
end
Create the jobs
Job.default_client_command = "runclient"
myJobQueue = []
10.times{|i| myJobQueue << Job.new(:name=>"job#{i}", :params=>"#{i}", :pre_run_handler => pre_run_handler,
:output_handler=>output_handler, :post_run_handler=>post_run_handler)}
Create the server
server = JobServer.new(myJobQueue, "~/work") #run 1 local worker implicitly
server.add_ssh_worker("192.168.0.1", "~/work_sparc")
server.add_ssh_worker("192.168.0.2", "~/work", 2)
server.dumpStatistics
server.serve # Wait until all jobs have finished
Defined Under Namespace
Classes: Deadlock
Instance Attribute Summary collapse
-
#hostStats ⇒ Object
readonly
hostStats[hostname]
returns theHostStatistics
object for the stringhostname
. -
#workers ⇒ Object
readonly
an array of the worker threads currently running.
Instance Method Summary collapse
-
#add_job(job) ⇒ Object
(also: #<<)
appends a job at the end of the queue and informs the sleeping worker threads that new jobs are available.
-
#add_local_worker(numWorkers = 1) ⇒ Object
Adds a worker thread that processes jobs on the local machine.
-
#add_ssh_worker(hostname, working_directory = "", numWorkers = 1) ⇒ Object
Adds a worker thread that processes jobs on a given remote host machine.
-
#close ⇒ Object
close must be called only when you have set terminateWorkersWhenJobQueueEmpty to false during instantiation.
-
#dumpStatistics(filename = "jobserver_stats.txt", timeInSec = 60) ⇒ Object
Writes statistics for each host to the given file.
-
#getNextJob(hostname) ⇒ Object
Removes and returns a Job from the queue using a FIFO strategy but considering the dependency constraints of the jobs.
-
#initialize(jobQueue, local_working_directory = "", numLocalWorkers = 1, terminateWorkersWhenJobQueueEmpty = true) ⇒ JobServer
constructor
Instantiates a new JobServer object and creates the given number of local clients, called workers.
-
#retryJob(job) ⇒ Object
Is called when
job
couldn’t be executed. -
#serve(verbose = true) ⇒ Object
serve waits for all jobs to terminate and outputs statistics if verbose is true.
Constructor Details
#initialize(jobQueue, local_working_directory = "", numLocalWorkers = 1, terminateWorkersWhenJobQueueEmpty = true) ⇒ JobServer
Instantiates a new JobServer object and creates the given number of local clients, called workers.
jobQueue
-
is an array of jobs of type Job. Jobs on the left side of the array are processed first. Jobs that had errors during execution will be enqueued at the end of the queue. If you want to decide whether to re-enqueue a job that had errors you can replace the method #retryJob by your own.
local_working_directory
-
is the directory in which local clients are launched
numLocalWorkers
-
is the number of client workers which run on the server itself (e.g. number of CPUs)
terminateWorkersWhenJobQueueEmpty
-
false means, the workers continue to run, even if the queue is empty until #close was called. This can be used if you want to add jobs while others are running. The situation can occur that all remaining jobs are running and the queue is empty but you want to add more jobs.
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/jobserver.rb', line 389 def initialize(jobQueue, local_working_directory = "", numLocalWorkers = 1, terminateWorkersWhenJobQueueEmpty = true) @jobQueue = jobQueue @jobQueue.extend(MonitorMixin) @initialQueueLength = @jobQueue.length @jobsRunning = [] @jobsRunning.extend(MonitorMixin) @noJobs_cond = @jobQueue.new_cond @local_working_directory = local_working_directory @workers = [] @hostStats = Hash.new @hostStats.extend(MonitorMixin) @terminateWorkersWhenJobQueueEmpty = terminateWorkersWhenJobQueueEmpty @usedHosts = [] add_local_worker(numLocalWorkers) end |
Instance Attribute Details
#hostStats ⇒ Object (readonly)
hostStats[hostname]
returns the HostStatistics
object for the string hostname
370 371 372 |
# File 'lib/jobserver.rb', line 370 def hostStats @hostStats end |
#workers ⇒ Object (readonly)
an array of the worker threads currently running
368 369 370 |
# File 'lib/jobserver.rb', line 368 def workers @workers end |
Instance Method Details
#add_job(job) ⇒ Object Also known as: <<
appends a job at the end of the queue and informs the sleeping worker threads that new jobs are available.
468 469 470 471 |
# File 'lib/jobserver.rb', line 468 def add_job(job) @jobQueue << job @noJobs_cond.broadcast #wake up workers that wait for new jobs end |
#add_local_worker(numWorkers = 1) ⇒ Object
Adds a worker thread that processes jobs on the local machine. This method is called automatically on object instantiation. numWorkers
indicates, how many workers should use this client. It should usually not exceed the number of CPUs that are available for the host. You may wish to set it to zero, if the server machine should not be used as a client as well.
480 481 482 |
# File 'lib/jobserver.rb', line 480 def add_local_worker(numWorkers = 1) add_worker("localhost", @local_working_directory, numWorkers) end |
#add_ssh_worker(hostname, working_directory = "", numWorkers = 1) ⇒ Object
Adds a worker thread that processes jobs on a given remote host machine. hostname
can contain a username like: [email protected]
If working_directory
is not empty then the client command will be executed in the given directory. numWorkers
indicates, how many workers should use this client. It should usually not exceed the number of CPUs that are available for the host.
490 491 492 493 |
# File 'lib/jobserver.rb', line 490 def add_ssh_worker(hostname, working_directory = "", numWorkers = 1) hostname = "localhost" if ENV['HOSTNAME'] == hostname add_worker(hostname, working_directory, numWorkers) end |
#close ⇒ Object
close must be called only when you have set terminateWorkersWhenJobQueueEmpty to false during instantiation. Then you tell the server that no further jobs will be added to the job queue. All worker threads that have been waiting for new jobs will terminate now. You should still wait for all workers to complete. Use serve to do so.
426 427 428 429 |
# File 'lib/jobserver.rb', line 426 def close @terminateWorkersWhenJobQueueEmpty = true @noJobs_cond.signal end |
#dumpStatistics(filename = "jobserver_stats.txt", timeInSec = 60) ⇒ Object
Writes statistics for each host to the given file. If no directory is given then in the local working directory given to new
is used. timeInSec
is the time after which the current statistics will be written. Time set to 0 means, write only when all jobs have finished. When the jobserver terminates, the last state will be written however the time.
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
# File 'lib/jobserver.rb', line 437 def dumpStatistics(filename = "jobserver_stats.txt", timeInSec=60) filename = File.join(@local_working_directory, filename) if filename == File.basename(filename) @dumpStatThread = Thread.new(timeInSec) do |sleepTime| loop do sleep(sleepTime) File.open(filename,"w") do |file| file.puts "Host statistics:\n================" @hostStats.synchronize do @hostStats.to_a.sort{|x,y| x[0]<=>y[0]}.each{|host,stats| file.puts "#{host}: #{stats}"} end unless @jobsRunning.empty? @jobsRunning.synchronize do file.puts "\nJobs running:\n============" file.puts @jobsRunning.map {|job| "#{job.host}: #{job}" },"" end end unless @jobQueue.empty? @jobQueue.synchronize do file.puts s="Jobs in the queue: (#{@jobQueue.length}/#@initialQueueLength remaining)" file.puts "="*s.length file.puts @jobQueue end end end end end end |
#getNextJob(hostname) ⇒ Object
Removes and returns a Job from the queue using a FIFO strategy but considering the dependency constraints of the jobs. That means, only those jobs are chosen that have only jobs in their dependency list that are finished. The user has to guarantee that the dependency-graph has no cycles. Furthermore each job has a function :runsOnHost that you might want to override in order to decide whether the job is fit to run on the given host. It may happen that no host gets a job. This may result in a job that failed but that is cruical to the execution of others so that these cannot be executed any more. This deadlock situation raises an exception.
When a job failed on a host, it tries to rerun on other hosts first (if there are any).
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 |
# File 'lib/jobserver.rb', line 527 def getNextJob(hostname) job = @jobQueue.find do |job| # it is assumed that all jobs form the queue are WAITING ok = (job.dependencies.length == job.dependencies.select{|dep| dep.status == Job::SUCCESS}.length and job.runsOnHost(hostname)) # ok == true => job could be run on 'hostname' # did the job fail on this host? if ok and job.failedOnHosts.include?(hostname) # if it failed already on some hosts but there are still machines it could run on and didn't fail on them yet, # so run on those hosts first. If otherwise the job failed on all hosts, try the current host again. ok = job.runsOnHosts(@usedHosts - job.failedOnHosts).empty? end ok end return @jobQueue.delete(job) # returns nil if job == nil end |
#retryJob(job) ⇒ Object
Is called when job
couldn’t be executed. Determines whether to try to rerun the job. Whether a job had errors or not depends on whether the results
object is false/nil or not. See #Job.new for further details about results
. By default, retryJob
allows three tries until the job is given up.
If you want another behaviour, override the method +retryJob+ by your own, e.g.:
require 'jobserver'
class JobServer
def retryJob(job)
puts "Job failed: #{job.name}"
return false
end
end
508 509 510 511 512 513 |
# File 'lib/jobserver.rb', line 508 def retryJob(job) if job.numTries < 3 puts "FAILURE: Will try to run job later: #{job}" true end # else return false implicitly end |
#serve(verbose = true) ⇒ Object
serve waits for all jobs to terminate and outputs statistics if verbose is true
406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
# File 'lib/jobserver.rb', line 406 def serve(verbose = true) raise(Exception, "No workers registered but serve was called. The jobs can't be processed!",caller) if @workers.empty? @workers.each{|worker| worker.wakeup} #wake all workers up @workers.each{|worker| worker.join} #wait for all workers to finish @dumpStatThread.wakeup if defined?(@dumpStatThread) and @dumpStatThread.status #wake up the statistics dumper #output statistics if verbose puts "Host statistics:\n================" @hostStats.to_a.sort{|x,y| x[0]<=>y[0]}.each{|host,stats| puts "#{host}: #{stats}"} puts end end |