Module: RemoteStep::SSH
- Defined in:
- lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb
Instance Attribute Summary collapse
-
#batch_options ⇒ Object
Returns the value of attribute batch_options.
-
#override_dependencies ⇒ Object
Returns the value of attribute override_dependencies.
-
#produce_dependencies ⇒ Object
Returns the value of attribute produce_dependencies.
-
#run_type ⇒ Object
Returns the value of attribute run_type.
Instance Method Summary collapse
- #_orchestrate_batch ⇒ Object
- #_run ⇒ Object
- #_run_batch ⇒ Object
- #abort ⇒ Object
- #clean ⇒ Object
- #init_job(cache_type = nil, other_params = {}) ⇒ Object
- #input_dependencies ⇒ Object
- #issue ⇒ Object
- #load ⇒ Object
- #path ⇒ Object
- #produce(*args) ⇒ Object
- #run(stream = nil) ⇒ Object
Instance Attribute Details
#batch_options ⇒ Object
Returns the value of attribute batch_options.
3 4 5 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3 def @batch_options end |
#override_dependencies ⇒ Object
Returns the value of attribute override_dependencies.
3 4 5 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3 def override_dependencies @override_dependencies end |
#produce_dependencies ⇒ Object
Returns the value of attribute produce_dependencies.
3 4 5 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3 def produce_dependencies @produce_dependencies end |
#run_type ⇒ Object
Returns the value of attribute run_type.
3 4 5 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3 def run_type @run_type end |
Instance Method Details
#_orchestrate_batch ⇒ Object
64 65 66 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 64 def _orchestrate_batch RemoteWorkflow::SSH.orchestrate_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {}) end |
#_run ⇒ Object
54 55 56 57 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 54 def _run RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies) RemoteWorkflow::SSH.run_job(File.join(base_url, task.to_s), @input_id, @base_name) end |
#_run_batch ⇒ Object
59 60 61 62 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 59 def _run_batch RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies) RemoteWorkflow::SSH.run_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {}) end |
#abort ⇒ Object
110 111 112 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 110 def abort Log.warn "not implemented RemoteWorkflow::SSH.abort(@url, @input_id, @base_name)" end |
#clean ⇒ Object
104 105 106 107 108 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 104 def clean init_job RemoteWorkflow::SSH.clean(@url, @input_id, @base_name) _restart end |
#init_job(cache_type = nil, other_params = {}) ⇒ Object
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 5 def init_job(cache_type = nil, other_params = {}) return self if @url cache_type = :asynchronous if cache_type.nil? and not @is_exec cache_type = :exec if cache_type.nil? @last_info_time = nil @done = false @server, @server_path = RemoteWorkflow::SSH.parse_url base_url @input_id ||= "inputs-" << rand(100000).to_s if override_dependencies && override_dependencies.any? override_dependencies.each do |od| name, _sep, value = od.partition("=") inputs[name] = value end end inputs.select{|i| Step === i }.each{|i| i.produce } RemoteWorkflow::SSH.upload_inputs(@server, inputs, @input_types, @input_id) @remote_path ||= Persist.memory("RemoteStep", :workflow => self.workflow, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do Misc.insist do input_types = {} RemoteWorkflow::SSH.post_job(File.join(base_url, task.to_s), @input_id, @base_name) end end @name = @remote_path.split("/").last if Open.remote?(@name) @url = @name @name = File.basename(@name) else @url = File.join(base_url, task.to_s, @name) end self end |
#input_dependencies ⇒ Object
114 115 116 117 118 119 120 121 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 114 def input_dependencies @input_dependencies ||= inputs.values.flatten. select{|i| Step === i || (defined?(RemoteStep) && RemoteStep === i) } + inputs.values.flatten. select{|dep| Path === dep && Step === dep.resource }. #select{|dep| ! dep.resource.started? }. # Ignore input_deps already started collect{|dep| dep.resource } end |
#issue ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 68 def issue input_types = {} init_job @remote_path = case @run_type when 'run', :run, nil _run when 'batch', :batch _run_batch when 'orchestrate', :orchestrate _orchestrate_batch end @started = true end |
#load ⇒ Object
91 92 93 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 91 def load load_res Open.open(path) end |
#path ⇒ Object
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 43 def path @server, @server_path = RemoteWorkflow::SSH.parse_url @base_url if info[:path] "ssh://" + @server + ":" + info[:path] elsif @remote_path "ssh://" + @server + ":" + @remote_path else "ssh://" + @server + ":" + ["var/jobs", self.workflow.to_s, task_name.to_s, @name] * "/" end end |
#produce(*args) ⇒ Object
82 83 84 85 86 87 88 89 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 82 def produce(*args) issue while ! (done? || error? || aborted?) sleep 1 end raise self.get_exception if error? self end |
#run(stream = nil) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 95 def run(stream = nil) if stream issue else produce self.load unless args.first end end |