Class: RemoteWorkflow

Inherits:
Object
  • Object
show all
Includes:
Workflow
Defined in:
lib/rbbt/workflow/remote_workflow.rb,
lib/rbbt/workflow/remote_workflow/driver.rb,
lib/rbbt/workflow/remote_workflow/driver/ssh.rb,
lib/rbbt/workflow/remote_workflow/driver/rest.rb

Defined Under Namespace

Modules: REST, SSH

Constant Summary collapse

RBBT_DEBUG_REMOTE_JSON =
ENV["RBBT_DEBUG_REMOTE_JSON"] == 'true'

Constants included from Workflow

Workflow::DEBUG_JOB_HASH, Workflow::DEFAULT_NAME, Workflow::FORGET_DEP_TASKS, Workflow::OUTPUT_FIELDS, Workflow::REMOVE_DEP_TASKS, Workflow::TAG

Instance Attribute Summary collapse

Attributes included from Workflow

#complete_name, #description, #documentation, #example_dir, #helpers, #last_task, #libdir, #load_step_cache, #relay_tasks, #remote_tasks, #step_cache, #task_dependencies, #task_description, #tasks, #workdir

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Workflow

#SOPT_str, __load_step, #_job, _load_step, #_prov_tasks, #add_remote_tasks, #all_exports, #assign_dep_inputs, complete_name, #dep, #dep_task, #dep_tree, #desc, #doc, doc_parse_chunks, doc_parse_first_line, doc_parse_up_to, #documentation_markdown, #documented_tasks, #example, #example_inputs, #example_step, #examples, #export_asynchronous, #export_exec, #export_stream, #export_synchronous, extended, #extension, #fast_load_id, fast_load_step, #get_SOPT, get_SOPT, #get_job_step, #helper, #id_for, #import, #import_task, init_relay_tasks, init_remote_tasks, installed_workflows, #job, #job_for_directory_inputs, job_path?, #jobs, #load_cromwell, #load_documentation, load_inputs, #load_name, load_relay_tasks, load_remote_tasks, load_step, #load_step, load_step_cache, load_workflow_file, load_workflow_libdir, #local_persist_setup, #local_workdir_setup, local_workflow_filename, #log, #make_local, #nextflow, #nextflow_dir, #nextflow_file, nextflow_file_params, nextflow_includes, #nextflow_project, nextflow_recursive_params, #override_dependencies, parse_nextflow_schema, parse_workflow_doc, plot_trace_job_times, process_relay_tasks, process_remote_tasks, #prov_string, #prov_tree, #real_dependencies, #rec_dependencies, #rec_input_defaults, #rec_input_descriptions, #rec_input_options, #rec_input_types, #rec_input_use, #rec_inputs, relocate, relocate_array, relocate_dependency, require_local_workflow, require_remote_workflow, require_workflow, resolve_locals, #resumable, #returns, #set_step_dependencies, #setup_override_dependency, #step_module, #step_path, #task, #task_exports, #task_for, #task_from_dep, #task_info, #task_inputs_from_directory, trace, trace_job_summary, trace_job_times, transplant, #unexport, #unlocated_override?, #with_workdir, workdir, workdir=, workflow_for

Methods included from InputModule

#input

Constructor Details

#initialize(url, name) ⇒ RemoteWorkflow

Returns a new instance of RemoteWorkflow.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/rbbt/workflow/remote_workflow.rb', line 7

def initialize(url, name)
  Log.debug{ "Loading remote workflow #{ name }: #{ url }" }
  @url, @name = url, name

  rest = url.include?('ssh://') ? false : true

  if rest
    self.extend RemoteWorkflow::REST
  else
    self.extend RemoteWorkflow::SSH
  end

  init_remote_tasks
end

Instance Attribute Details

#asynchronous_exportsObject

Returns the value of attribute asynchronous_exports.



5
6
7
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5

def asynchronous_exports
  @asynchronous_exports
end

#exec_exportsObject

Returns the value of attribute exec_exports.



5
6
7
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5

def exec_exports
  @exec_exports
end

#nameObject

Returns the value of attribute name.



5
6
7
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5

def name
  @name
end

#stream_exportsObject

Returns the value of attribute stream_exports.



5
6
7
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5

def stream_exports
  @stream_exports
end

#synchronous_exportsObject

Returns the value of attribute synchronous_exports.



5
6
7
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5

def synchronous_exports
  @synchronous_exports
end

#urlObject

Returns the value of attribute url.



5
6
7
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5

def url
  @url
end

Class Method Details

.capture_exceptionObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 41

def self.capture_exception
  begin
    yield
  rescue Exception => e
    raise e unless e.respond_to? :response
    begin
      ne = parse_exception e.response.to_s
      case ne
      when String
        raise e.class, ne
      when Exception
        raise ne
      else
        raise
      end
    rescue
      raise e
    end
    raise $!
  end
end

.fix_hash(hash, fix_values = false) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 5

def self.fix_hash(hash, fix_values = false)
  fixed = {}
  hash.each do |key, value|
    fixed[key.to_sym] = case value
                        when TrueClass
                          value
                        when FalseClass
                          value
                        when Hash 
                          fix_hash(value)  
                        when (fix_values and String )
                          value.to_sym
                        when IO
                          value.read
                        when TSV::Dumper
                          value.stream
                        when Step
                          stream = get_stream(value)
                          stream || value.load
                        else
                          value
                        end
  end
  fixed
end

.fix_params(params) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 63

def self.fix_params(params)
  new_params = {}
  params.each do |k,v|
    if Array === v and v.empty?
      new_params[k] = "EMPTY_ARRAY"
    else
      new_params[k] = v
    end
  end
  new_params
end

.load_path(job_url) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rbbt/workflow/remote_workflow.rb', line 61

def self.load_path(job_url)
  parts = job_url.split("/")
  name = parts.pop.split("?").first
  task = parts.pop
  workflow = parts.last
  url = parts * "/"

  step = RemoteStep.new url, task, nil
  step.name = name
  step.workflow = workflow
  step.started = true
  step.result_type = step.info[:result_type]
  step.result_description = step.info[:result_description]

  step
end

.parse_exception(text) ⇒ Object



31
32
33
34
35
36
37
38
39
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 31

def self.parse_exception(text)
  klass, message = text.split " => "
  begin
    klass = Kernel.const_get klass
    return klass.new message
  rescue
    message
  end
end

Instance Method Details

#__job(task, name = nil, inputs = {}) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/remote_workflow.rb', line 26

def __job(task, name = nil, inputs = {})
  task_info = task_info(task)
  fixed_inputs = {}
  input_types = IndiferentHash.setup(task_info[:input_types])

  inputs.each do |k,v| 
    k = k.to_sym
    if TSV === v
      fixed_inputs[k] = v.to_s
    else
      next if input_types[k].nil?
      case input_types[k].to_sym
      when :tsv, :array, :file, :text
        fixed_inputs[k] = (String === v and Open.exists?(v)) ? Open.open(v) : v
      else
        fixed_inputs[k] = v
      end
    end
  end

  stream_input = @can_stream ? task_info(task)[:input_options].select{|k,o| o[:stream] }.collect{|k,o| k }.first : nil
  step = RemoteStep.new(url, task, name, fixed_inputs, task_info[:input_types], task_info[:result_type], task_info[:result_description], @exec_exports.include?(task), @stream_exports.include?(task), stream_input)
  step.workflow = self
  step
end

#exported_tasksObject



75
76
77
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 75

def exported_tasks
  @asynchronous_exports + @synchronous_exports + @exec_exports
end

#load_id(id) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/rbbt/workflow/remote_workflow.rb', line 52

def load_id(id)
  task, name = id.split("/")
  step = RemoteStep.new url, task, nil
  step.name = name
  step.result_type = task_info(task)[:result_type]
  step.result_description = task_info(task)[:result_description]
  step
end

#load_tasksObject



79
80
81
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 79

def load_tasks
  exported_tasks.each{|name| tasks[name]}
end

#to_sObject



22
23
24
# File 'lib/rbbt/workflow/remote_workflow.rb', line 22

def to_s
  name
end