Class: Ripe::WorkerController::Preparer

Inherits:
Object
  • Object
show all
Defined in:
lib/ripe/worker_controller/preparer.rb

Overview

This class controls worker preparation from a given workflow, list of samples and parameters. It applies the workflow to each of the specified samples.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(workflow, samples, params = {}) ⇒ Preparer

Prepare workers by applying the workflow callback and its parameters to each sample.

Parameters:

  • workflow (String)

    the name of a workflow to apply on the sample list

  • samples (Array<String>)

    list of samples to apply the callback to

  • params (Hash<Symbol, String>) (defaults to: {})

    a list of worker-wide parameters



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/ripe/worker_controller/preparer.rb', line 28

def initialize(workflow, samples, params = {})
  # Extract callback and params from input
  callback, params = load_workflow(workflow, params)

  if ![:patch, :force, :depend].include?(params[:mode].to_sym)
    abort "Invalid mode #{params[:mode]}."
  end

  # Apply the workflow to each sample
  sample_blocks = prepare_sample_blocks(samples, callback, params)

  if sample_blocks
    # Split samples into groups of +:group_num+ samples and produce a
    # worker from each of these groups.
    @workers = sample_blocks.each_slice(params[:group_num].to_i).map do |worker_blocks|
      prepare_worker(worker_blocks, params)
    end
  else
    []
  end
end

Instance Attribute Details

#workersArray<Worker>

workers prepared in current batch

Returns:

  • (Array<Worker>)

    the current value of workers



15
16
17
# File 'lib/ripe/worker_controller/preparer.rb', line 15

def workers
  @workers
end

Instance Method Details

#load_workflow(workflow, params) ⇒ Proc, Hash<Symbol, String>

Load a workflow and return its callback and params components.

Parameters:

  • workflow (String)

    the name of a workflow

  • params (Hash<Symbol, String>)

    a list of worker-wide parameters

Returns:

  • (Proc, Hash<Symbol, String>)

    a list containing the workflow callback and default params



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/ripe/worker_controller/preparer.rb', line 58

def load_workflow(workflow, params)
  filename = Library.find(:workflow, workflow)
  abort "Could not find workflow #{workflow}." if filename == nil
  require_relative filename

  # Imports +$workflow+ from the workflow component.  This is a dirty
  # hack to help make the +DSL::WorkflowDSL+ more convenient for the
  # end user.

  params = {
    wd:        Dir.pwd,
    mode:      :patch,
    group_num: 1,
  }.merge($workflow.params.merge(params))

  [$workflow.callback, params]
end

#prepare_sample_blocks(samples, callback, params) ⇒ Hash

Apply the workflow (callback) to each sample, producing a single root block per sample.

Parameters:

  • samples (Array<String>)

    a list of samples

  • callback (Proc)

    workflow callback to be applied to each sample

  • params (Hash)

    a list of worker-wide parameters

Returns:

  • (Hash)

    a {sample => block} hash



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/ripe/worker_controller/preparer.rb', line 85

def prepare_sample_blocks(samples, callback, params)
  sample_blocks = samples.map do |sample|
    block = callback.call(sample, params)

    if block
      # No need to prune if callback returns nil
      block = block.prune(params[:mode].to_sym == :force,
                        params[:mode].to_sym == :depend)
    end

    if block != nil
      puts "Preparing sample #{sample}"
      {sample => block}
    else
      puts "Nothing to do for sample #{sample}"
      nil
    end
  end

  # Produce a {sample => block} hash
  sample_blocks.compact.inject(&:merge)
end

#prepare_worker(worker_sample_blocks, params) ⇒ DB::Worker

Prepare a worker from a group of sample blocks.

Parameters:

  • worker_sample_blocks (Hash)

    a list containing as many elements as there are samples in the group, with each element containing [String, Blocks::Block]

  • params (Hash)

    worker-level parameter list

Returns:



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/ripe/worker_controller/preparer.rb', line 117

def prepare_worker(worker_sample_blocks, params)
  worker = DB::Worker.create(handle: params[:handle])
  worker_blocks = prepare_worker_blocks(worker_sample_blocks, worker)

  # Combine all grouped sample blocks into a single worker block

  params = params.merge({
    name:    worker.id,
    stdout:  worker.stdout,
    stderr:  worker.stderr,
    command: Blocks::SerialBlock.new(*worker_blocks).command,
  })

  worker_block = Blocks::LiquidBlock.new("#{PATH}/share/moab.sh", params)
  File.open(worker.sh, 'w') { |f| f.write(worker_block.command) }

  worker.update({
    status:   :prepared,
    ppn:      params[:ppn],
    queue:    params[:queue],
    walltime: params[:walltime],
  })

  worker
end

#prepare_worker_blocks(worker_sample_blocks, worker) ⇒ Array<Blocks::Block>

Organize worker blocks into tasks and prepare them.

containing as many elements as there are samples in the group

Parameters:

Returns:

  • (Array<Blocks::Block>)

    a list of all the prepared blocks for a worker



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/ripe/worker_controller/preparer.rb', line 152

def prepare_worker_blocks(worker_sample_blocks, worker)
  worker_sample_blocks.map do |sample, block|
    # Preorder traversal of blocks -- assign incremental numbers starting from
    # 1 to each node as it is being traversed, as well as producing the job
    # file for each task.
    post_var_assign = lambda do |subblock|
      if subblock.blocks.length == 0
        # This section is only called when the subblock is actually a working
        # block (a leaf in the block arborescence).
        task = worker.tasks.create({
          sample: sample,
          block:  subblock.id,
        })

        File.open(task.sh, 'w') { |f| f.write(subblock.command) }
        subblock.vars.merge!(log: task.log)
      else
        subblock.blocks.each(&post_var_assign)
      end
    end

    post_var_assign.call(block)
    block
  end
end