Class: RemoteWorkflow
- Inherits:
-
Object
- Object
- RemoteWorkflow
- Includes:
- Workflow
- Defined in:
- lib/rbbt/workflow/remote_workflow.rb,
lib/rbbt/workflow/remote_workflow/driver.rb,
lib/rbbt/workflow/remote_workflow/driver/ssh.rb,
lib/rbbt/workflow/remote_workflow/driver/rest.rb
Defined Under Namespace
Constant Summary collapse
- RBBT_DEBUG_REMOTE_JSON =
ENV["RBBT_DEBUG_REMOTE_JSON"] == 'true'
Constants included from Workflow
Workflow::DEBUG_JOB_HASH, Workflow::DEFAULT_NAME, Workflow::FORGET_DEP_TASKS, Workflow::OUTPUT_FIELDS, Workflow::REMOVE_DEP_TASKS, Workflow::TAG
Instance Attribute Summary collapse
-
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
-
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
-
#name ⇒ Object
Returns the value of attribute name.
-
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
-
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
-
#url ⇒ Object
Returns the value of attribute url.
Attributes included from Workflow
#complete_name, #description, #documentation, #example_dir, #helpers, #last_task, #libdir, #load_step_cache, #relay_tasks, #remote_tasks, #step_cache, #task_dependencies, #task_description, #tasks, #workdir
Class Method Summary collapse
- .capture_exception ⇒ Object
- .fix_hash(hash, fix_values = false) ⇒ Object
- .fix_params(params) ⇒ Object
- .load_path(job_url) ⇒ Object
- .parse_exception(text) ⇒ Object
Instance Method Summary collapse
- #__job(task, name = nil, inputs = {}) ⇒ Object
- #exported_tasks ⇒ Object
-
#initialize(url, name) ⇒ RemoteWorkflow
constructor
A new instance of RemoteWorkflow.
- #load_id(id) ⇒ Object
- #load_tasks ⇒ Object
- #to_s ⇒ Object
Methods included from Workflow
#SOPT_str, __load_step, #_job, _load_step, #_prov_tasks, #add_remote_tasks, #all_exports, #assign_dep_inputs, complete_name, #dep, #dep_task, #dep_tree, #desc, #doc, doc_parse_chunks, doc_parse_first_line, doc_parse_up_to, #documentation_markdown, #documented_tasks, #example, #example_inputs, #example_step, #examples, #export_asynchronous, #export_exec, #export_stream, #export_synchronous, extended, #extension, #fast_load_id, fast_load_step, #get_SOPT, get_SOPT, #get_job_step, #helper, #id_for, #import, #import_task, init_relay_tasks, init_remote_tasks, installed_workflows, #job, #job_for_directory_inputs, job_path?, #jobs, #load_cromwell, #load_documentation, load_inputs, #load_name, load_relay_tasks, load_remote_tasks, load_step, #load_step, load_step_cache, load_workflow_file, load_workflow_libdir, #local_persist_setup, #local_workdir_setup, local_workflow_filename, #log, #make_local, #nextflow, #nextflow_dir, #nextflow_file, nextflow_file_params, nextflow_includes, #nextflow_project, nextflow_recursive_params, #override_dependencies, parse_nextflow_schema, parse_workflow_doc, plot_trace_job_times, process_relay_tasks, process_remote_tasks, #prov_string, #prov_tree, #real_dependencies, #rec_dependencies, #rec_input_defaults, #rec_input_descriptions, #rec_input_options, #rec_input_types, #rec_input_use, #rec_inputs, relocate, relocate_array, relocate_dependency, require_local_workflow, require_remote_workflow, require_workflow, resolve_locals, #resumable, #returns, #set_step_dependencies, #setup_override_dependency, #step_module, #step_path, #task, #task_exports, #task_for, #task_from_dep, #task_info, #task_inputs_from_directory, trace, trace_job_summary, trace_job_times, transplant, #unexport, #unlocated_override?, #with_workdir, workdir, workdir=, workflow_for
Methods included from InputModule
Constructor Details
#initialize(url, name) ⇒ RemoteWorkflow
Returns a new instance of RemoteWorkflow.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 7 def initialize(url, name) Log.debug{ "Loading remote workflow #{ name }: #{ url }" } @url, @name = url, name rest = url.include?('ssh://') ? false : true if rest self.extend RemoteWorkflow::REST else self.extend RemoteWorkflow::SSH end init_remote_tasks end |
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
5 6 7 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5 def asynchronous_exports @asynchronous_exports end |
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
5 6 7 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5 def exec_exports @exec_exports end |
#name ⇒ Object
Returns the value of attribute name.
5 6 7 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5 def name @name end |
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
5 6 7 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5 def stream_exports @stream_exports end |
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
5 6 7 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5 def synchronous_exports @synchronous_exports end |
#url ⇒ Object
Returns the value of attribute url.
5 6 7 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 5 def url @url end |
Class Method Details
.capture_exception ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 41 def self.capture_exception begin yield rescue Exception => e raise e unless e.respond_to? :response begin ne = parse_exception e.response.to_s case ne when String raise e.class, ne when Exception raise ne else raise end rescue raise e end raise $! end end |
.fix_hash(hash, fix_values = false) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 5 def self.fix_hash(hash, fix_values = false) fixed = {} hash.each do |key, value| fixed[key.to_sym] = case value when TrueClass value when FalseClass value when Hash fix_hash(value) when (fix_values and String ) value.to_sym when IO value.read when TSV::Dumper value.stream when Step stream = get_stream(value) stream || value.load else value end end fixed end |
.fix_params(params) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 63 def self.fix_params(params) new_params = {} params.each do |k,v| if Array === v and v.empty? new_params[k] = "EMPTY_ARRAY" else new_params[k] = v end end new_params end |
.load_path(job_url) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 61 def self.load_path(job_url) parts = job_url.split("/") name = parts.pop.split("?").first task = parts.pop workflow = parts.last url = parts * "/" step = RemoteStep.new url, task, nil step.name = name step.workflow = workflow step.started = true step.result_type = step.info[:result_type] step.result_description = step.info[:result_description] step end |
.parse_exception(text) ⇒ Object
31 32 33 34 35 36 37 38 39 |
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 31 def self.parse_exception(text) klass, = text.split " => " begin klass = Kernel.const_get klass return klass.new rescue end end |
Instance Method Details
#__job(task, name = nil, inputs = {}) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 26 def __job(task, name = nil, inputs = {}) task_info = task_info(task) fixed_inputs = {} input_types = IndiferentHash.setup(task_info[:input_types]) inputs.each do |k,v| k = k.to_sym if TSV === v fixed_inputs[k] = v.to_s else next if input_types[k].nil? case input_types[k].to_sym when :tsv, :array, :file, :text fixed_inputs[k] = (String === v and Open.exists?(v)) ? Open.open(v) : v else fixed_inputs[k] = v end end end stream_input = @can_stream ? task_info(task)[:input_options].select{|k,o| o[:stream] }.collect{|k,o| k }.first : nil step = RemoteStep.new(url, task, name, fixed_inputs, task_info[:input_types], task_info[:result_type], task_info[:result_description], @exec_exports.include?(task), @stream_exports.include?(task), stream_input) step.workflow = self step end |
#exported_tasks ⇒ Object
75 76 77 |
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 75 def exported_tasks @asynchronous_exports + @synchronous_exports + @exec_exports end |
#load_id(id) ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 52 def load_id(id) task, name = id.split("/") step = RemoteStep.new url, task, nil step.name = name step.result_type = task_info(task)[:result_type] step.result_description = task_info(task)[:result_description] step end |
#load_tasks ⇒ Object
79 80 81 |
# File 'lib/rbbt/workflow/remote_workflow/driver.rb', line 79 def load_tasks exported_tasks.each{|name| tasks[name]} end |
#to_s ⇒ Object
22 23 24 |
# File 'lib/rbbt/workflow/remote_workflow.rb', line 22 def to_s name end |