Class: WorkflowRESTClient::RemoteStep

Inherits:
Step
  • Object
show all
Defined in:
lib/rbbt/rest/client/step.rb

Constant Summary

Constants inherited from Step

Step::INFO_SERIALIAZER, Step::STREAM_CACHE, Step::STREAM_CACHE_MUTEX

Instance Attribute Summary collapse

Attributes inherited from Step

#bindings, #clean_name, #dependencies, #dupped, #mutex, #pid, #result, #saved_stream, #seen, #stream

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Step

#_abort, #_exec, #abort, #abort_pid, #abort_stream, #aborted?, #checks, #child, clean, #dirty?, #dup_inputs, dup_stream, #error?, #exception, files_dir, #files_dir, #get_stream, #grace, #info_file, info_file, #info_lock, job_name_for_info_file, #join_stream, #kill_children, #load_file, #log, log, log_block, log_progress, #log_progress, log_string, #merge_info, #message, #messages, #prepare_result, #progress_bar, #provenance, #provenance_paths, purge_stream_cache, #rec_dependencies, #relay_log, #run_dependencies, #save_file, #set_info, #soft_grace, #started?, started?, #status=, status_color, #step, step_info, #stop_dependencies, #streaming?, wait_for_jobs

Constructor Details

#initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false) ⇒ RemoteStep

Returns a new instance of RemoteStep.



14
15
16
17
18
# File 'lib/rbbt/rest/client/step.rb', line 14

def initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false)
  @base_url, @task, @base_name, @inputs, @result_type, @result_description, @is_exec = base_url, task, base_name, inputs, result_type, result_description, is_exec
  @mutex = Mutex.new
  RemoteStep.get_streams @inputs
end

Instance Attribute Details

#base_nameObject

Returns the value of attribute base_name.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def base_name
  @base_name
end

#base_urlObject

Returns the value of attribute base_url.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def base_url
  @base_url
end

#inputsObject

Returns the value of attribute inputs.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def inputs
  @inputs
end

#is_execObject

Returns the value of attribute is_exec.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def is_exec
  @is_exec
end

#result_descriptionObject

Returns the value of attribute result_description.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def result_description
  @result_description
end

#result_typeObject

Returns the value of attribute result_type.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def result_type
  @result_type
end

#taskObject

Returns the value of attribute task.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def task
  @task
end

#urlObject

Returns the value of attribute url.



4
5
6
# File 'lib/rbbt/rest/client/step.rb', line 4

def url
  @url
end

Class Method Details

.get_streams(inputs) ⇒ Object



6
7
8
9
10
11
12
13
# File 'lib/rbbt/rest/client/step.rb', line 6

def self.get_streams(inputs)
  inputs.each do |k,v|
    if Step === v
      stream = v.get_stream
      inputs[k] = stream || v.load
    end
  end
end

Instance Method Details

#cleanObject



156
157
158
159
160
161
162
# File 'lib/rbbt/rest/client/step.rb', line 156

def clean
  begin
    inputs[:_update] = :clean
  rescue Exception
  end
  self
end

#done?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/rbbt/rest/client/step.rb', line 47

def done?
  @done || status.to_s == 'done'
end

#exec(*args) ⇒ Object



138
139
140
# File 'lib/rbbt/rest/client/step.rb', line 138

def exec(*args)
  exec_job
end

#exec_jobObject



105
106
107
108
109
110
# File 'lib/rbbt/rest/client/step.rb', line 105

def exec_job
  res = WorkflowRESTClient.capture_exception do
    RestClient.post(URI.encode(File.join(base_url, task.to_s)), inputs.merge(:_cache_type => :exec, :_format => [:string, :boolean, :tsv, :annotations].include?(result_type) ? :raw : :json))
  end
  load_res res, result_type == :array ? :json : result_type
end

#file(file) ⇒ Object



55
56
57
# File 'lib/rbbt/rest/client/step.rb', line 55

def file(file)
  WorkflowRESTClient.get_raw(File.join(url, 'file', file))
end

#filesObject



51
52
53
# File 'lib/rbbt/rest/client/step.rb', line 51

def files
  WorkflowRESTClient.get_json(File.join(url, 'files'))
end

#forkObject



112
113
114
# File 'lib/rbbt/rest/client/step.rb', line 112

def fork
  init_job(:asynchronous)
end

#getObject



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/rbbt/rest/client/step.rb', line 87

def get
  params ||= {}
  params = params.merge(:_format => [:string, :boolean, :tsv, :annotations,:array].include?(result_type.to_sym) ? :raw : :json )
  Misc.insist 3, rand(2) + 1 do
    begin
      WorkflowRESTClient.get_raw(url, params)
    rescue
      Log.exception $!
      raise $!
    end
  end
end

#info(check_lock = false) ⇒ Object



29
30
31
32
33
34
35
36
37
# File 'lib/rbbt/rest/client/step.rb', line 29

def info(check_lock=false)
  @info ||= begin
              init_job unless url
              info = WorkflowRESTClient.get_json(File.join(url, 'info'))
              info = WorkflowRESTClient.fix_hash(info)
              info[:status] = info[:status].to_sym if String === info[:status]
              info
            end
end

#init_job(cache_type = :asynchronous) ⇒ Object

{{{ MANAGEMENT



61
62
63
64
65
66
67
# File 'lib/rbbt/rest/client/step.rb', line 61

def init_job(cache_type = :asynchronous)
  @name ||= Persist.memory("RemoteSteps", :jobname => @name, :inputs => inputs) do
    WorkflowRESTClient.post_jobname(File.join(base_url, task.to_s), inputs.merge(:jobname => @name||@base_name, :_cache_type => cache_type))
  end
  @url = File.join(base_url, task.to_s, @name)
  nil
end

#joinObject



142
143
144
145
146
# File 'lib/rbbt/rest/client/step.rb', line 142

def join
  return if self.done?
  self.load
  self
end

#loadObject



100
101
102
103
# File 'lib/rbbt/rest/client/step.rb', line 100

def load
  params = {}
  load_res get
end

#load_res(res, result_type = nil) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/rbbt/rest/client/step.rb', line 69

def load_res(res, result_type = nil)
  result_type ||= self.result_type
  case result_type
  when :string
    res
  when :boolean
    res == "true"
  when :tsv
    TSV.open(StringIO.new(res))
  when :annotations
    Annotated.load_tsv(TSV.open(StringIO.new(res)))
  when :array
    res.split("\n")
  else
    JSON.parse res
  end
end

#nameObject



20
21
22
23
# File 'lib/rbbt/rest/client/step.rb', line 20

def name
  return nil if @is_exec
  (Array === @url ? @url.first : @url).split("/").last
end

#pathObject



120
121
122
# File 'lib/rbbt/rest/client/step.rb', line 120

def path
  url
end

#recursive_cleanObject



148
149
150
151
152
153
154
# File 'lib/rbbt/rest/client/step.rb', line 148

def recursive_clean
  begin
    inputs[:_update] = :recursive_clean
  rescue Exception
  end
  self
end

#run(noload = false) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/rbbt/rest/client/step.rb', line 124

def run(noload = false)
  @mutex.synchronize do
    @result ||= begin
                  if @is_exec
                    exec_job 
                  else
                    init_job(:synchronous) 
                    self.load
                  end
                end
  end
  noload ? path : @result
end

#running?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/rbbt/rest/client/step.rb', line 116

def running?
  ! %w(done error aborted).include? status.to_s
end

#statusObject



39
40
41
42
43
44
45
# File 'lib/rbbt/rest/client/step.rb', line 39

def status
  begin
    info[:status]
  ensure
    @info = nil
  end
end

#task_nameObject



25
26
27
# File 'lib/rbbt/rest/client/step.rb', line 25

def task_name
  (Array === @url ? @url.first : @url).split("/")[-2]
end