Module: WorkflowSSHClient
- Defined in:
- lib/rbbt/workflow/remote/ssh/get.rb,
lib/rbbt/workflow/remote/ssh/adaptor.rb
Class Method Summary collapse
- .capture_exception ⇒ Object
- .fix_hash(hash, fix_values = false) ⇒ Object
- .fix_params(params) ⇒ Object
- .get_json(url, params = {}) ⇒ Object
- .parse_exception(text) ⇒ Object
- .upload_inputs(server, inputs, input_types, input_id) ⇒ Object
Instance Method Summary collapse
- #clean ⇒ Object
- #documentation ⇒ Object
- #init_job(cache_type = nil, other_params = {}) ⇒ Object
- #init_remote_tasks ⇒ Object
- #load_tasks ⇒ Object
- #path ⇒ Object
- #produce(*args) ⇒ Object
- #run(*args) ⇒ Object
- #task_dependencies ⇒ Object
- #task_info(task) ⇒ Object
- #tasks ⇒ Object
- #workflow_description ⇒ Object
Class Method Details
.capture_exception ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 38 def self.capture_exception begin yield rescue Exception => e raise e unless e.respond_to? :response begin ne = parse_exception e.response.to_s case ne when String raise e.class, ne when Exception raise ne else raise end rescue raise e end raise $! end end |
.fix_hash(hash, fix_values = false) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 2 def self.fix_hash(hash, fix_values = false) fixed = {} hash.each do |key, value| fixed[key.to_sym] = case value when TrueClass value when FalseClass value when Hash fix_hash(value) when (fix_values and String ) value.to_sym when IO value.read when TSV::Dumper value.stream when Step stream = get_stream(value) stream || value.load else value end end fixed end |
.fix_params(params) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 60 def self.fix_params(params) new_params = {} params.each do |k,v| if Array === v and v.empty? new_params[k] = "EMPTY_ARRAY" else new_params[k] = v end end new_params end |
.get_json(url, params = {}) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 72 def self.get_json(url, params = {}) Log.debug{ "SSHClient get_json: #{ url } - #{Misc.fingerprint params }" } params = params.merge({ :_format => 'json' }) params = fix_params params res = capture_exception do Misc.insist(2, 0.5) do SSHDriver.get_json(url, :params => params) end end begin JSON.parse(res) rescue res end end |
.parse_exception(text) ⇒ Object
28 29 30 31 32 33 34 35 36 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 28 def self.parse_exception(text) klass, = text.split " => " begin klass = Kernel.const_get klass return klass.new rescue end end |
.upload_inputs(server, inputs, input_types, input_id) ⇒ Object
90 91 92 93 94 95 96 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 90 def self.upload_inputs(server, inputs, input_types, input_id) TmpFile.with_file do |dir| if Step.save_inputs(inputs, input_types, dir) CMD.cmd("ssh '#{server}' mkdir -p .rbbt/tmp/tmp-ssh_job_inputs/; scp -r '#{dir}' #{server}:.rbbt/tmp/tmp-ssh_job_inputs/#{input_id}") end end end |
Instance Method Details
#clean ⇒ Object
145 146 147 148 149 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 145 def clean init_job SSHDriver.clean(@url, @input_id, @base_name) if done? _restart end |
#documentation ⇒ Object
8 9 10 11 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 8 def documentation @documention ||= IndiferentHash.setup(WorkflowSSHClient.get_json(File.join(url, "documentation"))) @documention end |
#init_job(cache_type = nil, other_params = {}) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 101 def init_job(cache_type = nil, other_params = {}) cache_type = :asynchronous if cache_type.nil? and not @is_exec cache_type = :exec if cache_type.nil? @last_info_time = nil @done = false @server, @server_path = SSHDriver.parse_url base_url @input_id ||= "inputs-" << rand(100000).to_s @input_types = task_info(task)[:input_types] WorkflowSSHClient.upload_inputs(@server, inputs, @input_types, @input_id) @name ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do Misc.insist do input_types = {} SSHDriver.post_job(File.join(base_url, task.to_s), @input_id, @base_name) end end if Open.remote? @name @url = @name @name = File.basename(@name) else @url = File.join(base_url, task.to_s, @name) end self end |
#init_remote_tasks ⇒ Object
56 57 58 59 60 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 56 def init_remote_tasks @task_info = IndiferentHash.setup(WorkflowSSHClient.get_json(url)) @exec_exports = @stream_exports = @synchronous_exports = [] @asynchronous_exports = @task_info.keys end |
#load_tasks ⇒ Object
42 43 44 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 42 def load_tasks @task_info.keys.each{|name| tasks[name]} end |
#path ⇒ Object
127 128 129 130 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 127 def path @server, @server_path = SSHDriver.parse_url @base_url "ssh://" + @server + ":" + @remote_path end |
#produce(*args) ⇒ Object
132 133 134 135 136 137 138 139 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 132 def produce(*args) input_types = {} init_job @remote_path = SSHDriver.run_job(File.join(base_url, task.to_s), @input_id, @base_name) while ! done? sleep 1 end end |
#run(*args) ⇒ Object
141 142 143 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 141 def run(*args) produce(*args) end |
#task_dependencies ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 46 def task_dependencies @task_dependencies ||= Hash.new do |hash,task| hash[task] = if exported_tasks.include? task WorkflowSSHClient.get_json(File.join(url, task.to_s, 'dependencies')) else [] end end end |
#task_info(task) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 13 def task_info(task) @task_info ||= IndiferentHash.setup({}) if @task_info[task].nil? task_info = WorkflowSSHClient.get_json(File.join(@base_url, task.to_s)) task_info = WorkflowSSHClient.fix_hash(task_info) task_info[:result_type] = task_info[:result_type].to_sym if task_info[:result_type] task_info[:export] = task_info[:export].to_sym if task_info[:export] task_info[:input_types] = WorkflowSSHClient.fix_hash(task_info[:input_types], true) task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym } @task_info[task] = IndiferentHash.setup(task_info) end IndiferentHash.setup(@task_info[task]) end |
#tasks ⇒ Object
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 31 def tasks @tasks ||= Hash.new do |hash,task_name| info = @task_info[task_name] task = Task.setup info do |*args| raise "This is a remote task" end task.name = task_name.to_sym hash[task_name] = task end end |
#workflow_description ⇒ Object
4 5 6 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 4 def workflow_description WorkflowSSHClient.get_raw(File.join(url, 'description')) end |