Module: RemoteStep::SSH

Defined in:
lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#batch_optionsObject

Returns the value of attribute batch_options.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def batch_options
  @batch_options
end

#override_dependenciesObject

Returns the value of attribute override_dependencies.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def override_dependencies
  @override_dependencies
end

#produce_dependenciesObject

Returns the value of attribute produce_dependencies.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def produce_dependencies
  @produce_dependencies
end

#run_typeObject

Returns the value of attribute run_type.



3
4
5
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 3

def run_type
  @run_type
end

Instance Method Details

#_orchestrate_batchObject



64
65
66
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 64

def _orchestrate_batch
  RemoteWorkflow::SSH.orchestrate_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {})
end

#_runObject



54
55
56
57
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 54

def _run
  RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies)
  RemoteWorkflow::SSH.run_job(File.join(base_url, task.to_s), @input_id, @base_name)
end

#_run_batchObject



59
60
61
62
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 59

def _run_batch
  RemoteWorkflow::SSH.upload_dependencies(self, @server, 'user', @produce_dependencies)
  RemoteWorkflow::SSH.run_batch_job(File.join(base_url, task.to_s), @input_id, @base_name, @batch_options || {})
end

#abortObject



110
111
112
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 110

def abort
  Log.warn "not implemented RemoteWorkflow::SSH.abort(@url, @input_id, @base_name)"
end

#cleanObject



104
105
106
107
108
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 104

def clean
  init_job
  RemoteWorkflow::SSH.clean(@url, @input_id, @base_name)
  _restart
end

#init_job(cache_type = nil, other_params = {}) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 5

def init_job(cache_type = nil, other_params = {})
  return self if @url
  cache_type = :asynchronous if cache_type.nil? and not @is_exec
  cache_type = :exec if cache_type.nil?
  @last_info_time = nil
  @done = false
  @server, @server_path = RemoteWorkflow::SSH.parse_url base_url
  @input_id ||= "inputs-" << rand(100000).to_s

  if override_dependencies && override_dependencies.any?
    override_dependencies.each do |od|
      name, _sep, value = od.partition("=")
      inputs[name] = value
    end
  end

  inputs.select{|i| Step === i }.each{|i| i.produce }

  RemoteWorkflow::SSH.upload_inputs(@server, inputs, @input_types, @input_id)

  @remote_path ||= Persist.memory("RemoteStep", :workflow => self.workflow, :task => task, :jobname => @name, :inputs => inputs, :cache_type => cache_type) do
    Misc.insist do
      input_types = {}
      RemoteWorkflow::SSH.post_job(File.join(base_url, task.to_s), @input_id, @base_name)
    end
  end
  @name = @remote_path.split("/").last

  if Open.remote?(@name)
    @url = @name
    @name = File.basename(@name)
  else
    @url = File.join(base_url, task.to_s, @name)
  end

  self
end

#input_dependenciesObject



114
115
116
117
118
119
120
121
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 114

def input_dependencies
  @input_dependencies ||= inputs.values.flatten.
    select{|i| Step === i || (defined?(RemoteStep) && RemoteStep === i) } + 
    inputs.values.flatten.
    select{|dep| Path === dep && Step === dep.resource }.
    #select{|dep| ! dep.resource.started? }. # Ignore input_deps already started
    collect{|dep| dep.resource }
end

#issueObject



68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 68

def issue
  input_types = {}
  init_job
  @remote_path = case @run_type
                 when 'run', :run, nil
                   _run
                 when 'batch', :batch
                   _run_batch
                 when 'orchestrate', :orchestrate
                   _orchestrate_batch
                 end
  @started = true
end

#loadObject



91
92
93
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 91

def load
  load_res Open.open(path)
end

#pathObject



43
44
45
46
47
48
49
50
51
52
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 43

def path
  @server, @server_path = RemoteWorkflow::SSH.parse_url @base_url
  if info[:path]
    "ssh://" + @server + ":" + info[:path]
  elsif @remote_path
    "ssh://" + @server + ":" + @remote_path
  else
    "ssh://" + @server + ":" + ["var/jobs", self.workflow.to_s, task_name.to_s, @name] * "/"
  end
end

#produce(*args) ⇒ Object



82
83
84
85
86
87
88
89
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 82

def produce(*args)
  issue
  while ! (done? || error? || aborted?)
    sleep 1
  end
  raise self.get_exception if error?
  self
end

#run(stream = nil) ⇒ Object



95
96
97
98
99
100
101
102
# File 'lib/rbbt/workflow/remote_workflow/remote_step/ssh.rb', line 95

def run(stream = nil)
  if stream
    issue
  else
    produce
    self.load unless args.first
  end
end