Class: Ripe::WorkerController::Preparer
- Inherits:
-
Object
- Object
- Ripe::WorkerController::Preparer
- Defined in:
- lib/ripe/worker_controller/preparer.rb
Overview
This class controls worker preparation from a given workflow, list of samples and parameters. It applies the workflow to each of the specified samples.
Instance Attribute Summary collapse
-
#workers ⇒ Array<Worker>
workers prepared in current batch.
Instance Method Summary collapse
-
#initialize(workflow, samples, params = {}) ⇒ Preparer
constructor
Prepare workers by applying the workflow callback and its parameters to each sample.
-
#load_workflow(workflow, params) ⇒ Proc, Hash<Symbol, String>
Load a workflow and return its
callback
andparams
components. -
#prepare_sample_blocks(samples, callback, params) ⇒ Hash
Apply the workflow (callback) to each sample, producing a single root block per sample.
-
#prepare_worker(worker_sample_blocks, params) ⇒ DB::Worker
Prepare a worker from a group of sample blocks.
-
#prepare_worker_blocks(worker_sample_blocks, worker) ⇒ Array<Blocks::Block>
Organize worker blocks into tasks and prepare them.
Constructor Details
#initialize(workflow, samples, params = {}) ⇒ Preparer
Prepare workers by applying the workflow callback and its parameters to each sample.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/ripe/worker_controller/preparer.rb', line 28 def initialize(workflow, samples, params = {}) # Extract callback and params from input callback, params = load_workflow(workflow, params) if ![:patch, :force, :depend].include?(params[:mode].to_sym) abort "Invalid mode #{params[:mode]}." end # Apply the workflow to each sample sample_blocks = prepare_sample_blocks(samples, callback, params) if sample_blocks # Split samples into groups of +:group_num+ samples and produce a # worker from each of these groups. @workers = sample_blocks.each_slice(params[:group_num].to_i).map do |worker_blocks| prepare_worker(worker_blocks, params) end else [] end end |
Instance Attribute Details
#workers ⇒ Array<Worker>
workers prepared in current batch
15 16 17 |
# File 'lib/ripe/worker_controller/preparer.rb', line 15 def workers @workers end |
Instance Method Details
#load_workflow(workflow, params) ⇒ Proc, Hash<Symbol, String>
Load a workflow and return its callback
and params
components.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/ripe/worker_controller/preparer.rb', line 58 def load_workflow(workflow, params) filename = Library.find(:workflow, workflow) abort "Could not find workflow #{workflow}." if filename == nil require_relative filename # Imports +$workflow+ from the workflow component. This is a dirty # hack to help make the +DSL::WorkflowDSL+ more convenient for the # end user. params = { wd: Dir.pwd, mode: :patch, group_num: 1, }.merge($workflow.params.merge(params)) [$workflow.callback, params] end |
#prepare_sample_blocks(samples, callback, params) ⇒ Hash
Apply the workflow (callback) to each sample, producing a single root block per sample.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/ripe/worker_controller/preparer.rb', line 85 def prepare_sample_blocks(samples, callback, params) sample_blocks = samples.map do |sample| block = callback.call(sample, params) if block # No need to prune if callback returns nil block = block.prune(params[:mode].to_sym == :force, params[:mode].to_sym == :depend) end if block != nil puts "Preparing sample #{sample}" {sample => block} else puts "Nothing to do for sample #{sample}" nil end end # Produce a {sample => block} hash sample_blocks.compact.inject(&:merge) end |
#prepare_worker(worker_sample_blocks, params) ⇒ DB::Worker
Prepare a worker from a group of sample blocks.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/ripe/worker_controller/preparer.rb', line 117 def prepare_worker(worker_sample_blocks, params) worker = DB::Worker.create(handle: params[:handle]) worker_blocks = prepare_worker_blocks(worker_sample_blocks, worker) # Combine all grouped sample blocks into a single worker block params = params.merge({ name: worker.id, stdout: worker.stdout, stderr: worker.stderr, command: Blocks::SerialBlock.new(*worker_blocks).command, }) worker_block = Blocks::LiquidBlock.new("#{PATH}/share/moab.sh", params) File.open(worker.sh, 'w') { |f| f.write(worker_block.command) } worker.update({ status: :prepared, ppn: params[:ppn], queue: params[:queue], walltime: params[:walltime], }) worker end |
#prepare_worker_blocks(worker_sample_blocks, worker) ⇒ Array<Blocks::Block>
Organize worker blocks into tasks and prepare them.
containing as many elements as there are samples in the group
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/ripe/worker_controller/preparer.rb', line 152 def prepare_worker_blocks(worker_sample_blocks, worker) worker_sample_blocks.map do |sample, block| # Preorder traversal of blocks -- assign incremental numbers starting from # 1 to each node as it is being traversed, as well as producing the job # file for each task. post_var_assign = lambda do |subblock| if subblock.blocks.length == 0 # This section is only called when the subblock is actually a working # block (a leaf in the block arborescence). task = worker.tasks.create({ sample: sample, block: subblock.id, }) File.open(task.sh, 'w') { |f| f.write(subblock.command) } subblock.vars.merge!(log: task.log) else subblock.blocks.each(&post_var_assign) end end post_var_assign.call(block) block end end |