Module: WorkflowSSHClient
- Defined in:
- lib/rbbt/workflow/remote/ssh/get.rb,
lib/rbbt/workflow/remote/ssh/adaptor.rb
Class Method Summary collapse
- .__prepare_inputs_for_restclient(inputs) ⇒ Object
- .capture_exception ⇒ Object
- .execute_job(base_url, task, task_params, cache_type) ⇒ Object
- .fix_hash(hash, fix_values = false) ⇒ Object
- .fix_params(params) ⇒ Object
- .get_json(url, params = {}) ⇒ Object
- .parse_exception(text) ⇒ Object
- .post_jobname(url, inputs, input_types) ⇒ Object
Instance Method Summary collapse
- #clean ⇒ Object
- #documentation ⇒ Object
- #init_job(cache_type = nil, other_params = {}) ⇒ Object
- #init_remote_tasks ⇒ Object
- #load_tasks ⇒ Object
- #path ⇒ Object
- #run(*args) ⇒ Object
- #task_dependencies ⇒ Object
- #task_info(task) ⇒ Object
- #tasks ⇒ Object
- #workflow_description ⇒ Object
Class Method Details
.__prepare_inputs_for_restclient(inputs) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 2 def self.__prepare_inputs_for_restclient(inputs) inputs.each do |k,v| if v.respond_to? :path and not v.respond_to? :original_filename class << v def original_filename File.(path) end end end if Array === v and v.empty? inputs[k] = "EMPTY_ARRAY" end end end |
.capture_exception ⇒ Object
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 185 def self.capture_exception begin yield rescue Exception => e raise e unless e.respond_to? :response begin ne = parse_exception e.response.to_s case ne when String raise e.class, ne when Exception raise ne else raise end rescue raise e end raise $! end end |
.execute_job(base_url, task, task_params, cache_type) ⇒ Object
76 77 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 76 def self.execute_job(base_url, task, task_params, cache_type) end |
.fix_hash(hash, fix_values = false) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 149 def self.fix_hash(hash, fix_values = false) fixed = {} hash.each do |key, value| fixed[key.to_sym] = case value when TrueClass value when FalseClass value when Hash fix_hash(value) when (fix_values and String ) value.to_sym when IO value.read when TSV::Dumper value.stream when Step stream = get_stream(value) stream || value.load else value end end fixed end |
.fix_params(params) ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 207 def self.fix_params(params) new_params = {} params.each do |k,v| if Array === v and v.empty? new_params[k] = "EMPTY_ARRAY" else new_params[k] = v end end new_params end |
.get_json(url, params = {}) ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 219 def self.get_json(url, params = {}) Log.debug{ "SSHClient get_json: #{ url } - #{Misc.fingerprint params }" } params = params.merge({ :_format => 'json' }) params = fix_params params res = capture_exception do Misc.insist(2, 0.5) do SSHClient.get_json(url, :params => params) end end begin JSON.parse(res) rescue res end end |
.parse_exception(text) ⇒ Object
175 176 177 178 179 180 181 182 183 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 175 def self.parse_exception(text) klass, = text.split " => " begin klass = Kernel.const_get klass return klass.new rescue end end |
Instance Method Details
#clean ⇒ Object
275 276 277 278 279 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 275 def clean init_job SSHClient.clean(@url) if done? _restart end |
#documentation ⇒ Object
22 23 24 25 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 22 def documentation @documention ||= IndiferentHash.setup(WorkflowSSHClient.get_json(File.join(url, "documentation"))) @documention end |
#init_job(cache_type = nil, other_params = {}) ⇒ Object
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 242 def init_job(cache_type = nil, other_params = {}) cache_type = :asynchronous if cache_type.nil? and not @is_exec cache_type = :exec if cache_type.nil? @last_info_time = nil @done = false get_streams @name ||= Persist.memory("RemoteSteps", :workflow => self, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do Misc.insist do #@adaptor.post_jobname(File.join(base_url, task.to_s), inputs.merge(other_params).merge(:jobname => @name||@base_name, :_cache_type => cache_type)) input_types = {} @adaptor.post_jobname(File.join(base_url, task.to_s), inputs, input_types) end end if Open.remote? @name @url = @name @name = File.basename(@name) else @url = File.join(base_url, task.to_s, @name) end self end |
#init_remote_tasks ⇒ Object
70 71 72 73 74 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 70 def init_remote_tasks @task_info = IndiferentHash.setup(WorkflowSSHClient.get_json(url)) @exec_exports = @stream_exports = @synchronous_exports = [] @asynchronous_exports = @task_info.keys end |
#load_tasks ⇒ Object
56 57 58 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 56 def load_tasks @task_info.keys.each{|name| tasks[name]} end |
#path ⇒ Object
264 265 266 267 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 264 def path server, path = SSHClient.parse_url(url) "ssh://" + server + ":" + info[:path] end |
#run(*args) ⇒ Object
269 270 271 272 |
# File 'lib/rbbt/workflow/remote/ssh/get.rb', line 269 def run(*args) input_types = {} SSHClient.run_job(File.join(base_url, task.to_s), inputs, input_types) end |
#task_dependencies ⇒ Object
60 61 62 63 64 65 66 67 68 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 60 def task_dependencies @task_dependencies ||= Hash.new do |hash,task| hash[task] = if exported_tasks.include? task WorkflowSSHClient.get_json(File.join(url, task.to_s, 'dependencies')) else [] end end end |
#task_info(task) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 27 def task_info(task) @task_info ||= IndiferentHash.setup({}) @task_info[task] if @task_info[task].nil? task_info = WorkflowSSHClient.get_json(File.join(url, task.to_s, 'info')) task_info = WorkflowSSHClient.fix_hash(task_info) task_info[:result_type] = task_info[:result_type].to_sym task_info[:export] = task_info[:export].to_sym task_info[:input_types] = WorkflowSSHClient.fix_hash(task_info[:input_types], true) task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym } @task_info[task] = task_info end @task_info[task] end |
#tasks ⇒ Object
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 45 def tasks @tasks ||= Hash.new do |hash,task_name| info = @task_info[task_name] task = Task.setup info do |*args| raise "This is a remote task" end task.name = task_name.to_sym hash[task_name] = task end end |
#workflow_description ⇒ Object
18 19 20 |
# File 'lib/rbbt/workflow/remote/ssh/adaptor.rb', line 18 def workflow_description WorkflowSSHClient.get_raw(File.join(url, 'description')) end |