Class: WorkflowRESTClient::RemoteStep
- Inherits:
-
Step
- Object
- Step
- WorkflowRESTClient::RemoteStep
show all
- Defined in:
- lib/rbbt/rest/client/step.rb
Constant Summary
Constants inherited
from Step
Step::INFO_SERIALIAZER, Step::STREAM_CACHE, Step::STREAM_CACHE_MUTEX
Instance Attribute Summary collapse
Attributes inherited from Step
#bindings, #clean_name, #dependencies, #dupped, #mutex, #pid, #result, #saved_stream, #seen, #stream
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from Step
#_abort, #_exec, #abort, #abort_pid, #abort_stream, #aborted?, #checks, #child, clean, #dup_inputs, dup_stream, #error?, #exception, #files_dir, files_dir, #get_stream, #grace, #info_file, info_file, #info_lock, job_name_for_info_file, #join_stream, #kill_children, #load_file, #log, log, log_block, log_progress, #log_progress, log_string, #merge_info, #message, #messages, #prepare_result, #progress_bar, #provenance, #provenance_paths, purge_stream_cache, #rec_dependencies, #relay_log, #run_dependencies, #save_file, #set_info, #soft_grace, #started?, started?, #status=, status_color, #step, step_info, #stop_dependencies, #streaming?, wait_for_jobs
Constructor Details
#initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false) ⇒ RemoteStep
Returns a new instance of RemoteStep.
14
15
16
17
18
|
# File 'lib/rbbt/rest/client/step.rb', line 14
def initialize(base_url, task = nil, base_name = nil, inputs = nil, result_type = nil, result_description = nil, is_exec = false)
@base_url, @task, @base_name, @inputs, @result_type, @result_description, @is_exec = base_url, task, base_name, inputs, result_type, result_description, is_exec
@mutex = Mutex.new
RemoteStep.get_streams @inputs
end
|
Instance Attribute Details
#base_name ⇒ Object
Returns the value of attribute base_name.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def base_name
@base_name
end
|
#base_url ⇒ Object
Returns the value of attribute base_url.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def base_url
@base_url
end
|
Returns the value of attribute inputs.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def inputs
@inputs
end
|
#is_exec ⇒ Object
Returns the value of attribute is_exec.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def is_exec
@is_exec
end
|
#result_description ⇒ Object
Returns the value of attribute result_description.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def result_description
@result_description
end
|
#result_type ⇒ Object
Returns the value of attribute result_type.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def result_type
@result_type
end
|
#task ⇒ Object
Returns the value of attribute task.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def task
@task
end
|
#url ⇒ Object
Returns the value of attribute url.
4
5
6
|
# File 'lib/rbbt/rest/client/step.rb', line 4
def url
@url
end
|
Class Method Details
.get_streams(inputs) ⇒ Object
6
7
8
9
10
11
12
13
|
# File 'lib/rbbt/rest/client/step.rb', line 6
def self.get_streams(inputs)
inputs.each do |k,v|
if Step === v
stream = v.get_stream
inputs[k] = stream || v.load
end
end
end
|
Instance Method Details
#clean ⇒ Object
156
157
158
159
160
161
162
|
# File 'lib/rbbt/rest/client/step.rb', line 156
def clean
begin
inputs[:_update] = :clean
rescue Exception
end
self
end
|
#done? ⇒ Boolean
47
48
49
|
# File 'lib/rbbt/rest/client/step.rb', line 47
def done?
@done || status.to_s == 'done'
end
|
#exec(*args) ⇒ Object
138
139
140
|
# File 'lib/rbbt/rest/client/step.rb', line 138
def exec(*args)
exec_job
end
|
#exec_job ⇒ Object
105
106
107
108
109
110
|
# File 'lib/rbbt/rest/client/step.rb', line 105
def exec_job
res = WorkflowRESTClient.capture_exception do
RestClient.post(URI.encode(File.join(base_url, task.to_s)), inputs.merge(:_cache_type => :exec, :_format => [:string, :boolean, :tsv, :annotations].include?(result_type) ? :raw : :json))
end
load_res res, result_type == :array ? :json : result_type
end
|
#file(file) ⇒ Object
55
56
57
|
# File 'lib/rbbt/rest/client/step.rb', line 55
def file(file)
WorkflowRESTClient.get_raw(File.join(url, 'file', file))
end
|
#fork ⇒ Object
112
113
114
|
# File 'lib/rbbt/rest/client/step.rb', line 112
def fork
init_job(:asynchronous)
end
|
#get ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/rbbt/rest/client/step.rb', line 87
def get
params ||= {}
params = params.merge(:_format => [:string, :boolean, :tsv, :annotations,:array].include?(result_type.to_sym) ? :raw : :json )
Misc.insist 3, rand(2) + 1 do
begin
WorkflowRESTClient.get_raw(url, params)
rescue
Log.exception $!
raise $!
end
end
end
|
#info(check_lock = false) ⇒ Object
29
30
31
32
33
34
35
36
37
|
# File 'lib/rbbt/rest/client/step.rb', line 29
def info(check_lock=false)
@info ||= begin
init_job unless url
info = WorkflowRESTClient.get_json(File.join(url, 'info'))
info = WorkflowRESTClient.fix_hash(info)
info[:status] = info[:status].to_sym if String === info[:status]
info
end
end
|
#init_job(cache_type = :asynchronous) ⇒ Object
61
62
63
64
65
66
67
|
# File 'lib/rbbt/rest/client/step.rb', line 61
def init_job(cache_type = :asynchronous)
@name ||= Persist.memory("RemoteSteps", :jobname => @name, :inputs => inputs) do
WorkflowRESTClient.post_jobname(File.join(base_url, task.to_s), inputs.merge(:jobname => @name||@base_name, :_cache_type => cache_type))
end
@url = File.join(base_url, task.to_s, @name)
nil
end
|
#join ⇒ Object
142
143
144
145
146
|
# File 'lib/rbbt/rest/client/step.rb', line 142
def join
return if self.done?
self.load
self
end
|
#load ⇒ Object
100
101
102
103
|
# File 'lib/rbbt/rest/client/step.rb', line 100
def load
params = {}
load_res get
end
|
#load_res(res, result_type = nil) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/rbbt/rest/client/step.rb', line 69
def load_res(res, result_type = nil)
result_type ||= self.result_type
case result_type
when :string
res
when :boolean
res == "true"
when :tsv
TSV.open(StringIO.new(res))
when :annotations
Annotated.load_tsv(TSV.open(StringIO.new(res)))
when :array
res.split("\n")
else
JSON.parse res
end
end
|
#name ⇒ Object
20
21
22
23
|
# File 'lib/rbbt/rest/client/step.rb', line 20
def name
return nil if @is_exec
(Array === @url ? @url.first : @url).split("/").last
end
|
#path ⇒ Object
120
121
122
|
# File 'lib/rbbt/rest/client/step.rb', line 120
def path
url
end
|
#recursive_clean ⇒ Object
148
149
150
151
152
153
154
|
# File 'lib/rbbt/rest/client/step.rb', line 148
def recursive_clean
begin
inputs[:_update] = :recursive_clean
rescue Exception
end
self
end
|
#run(noload = false) ⇒ Object
124
125
126
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/rbbt/rest/client/step.rb', line 124
def run(noload = false)
@mutex.synchronize do
@result ||= begin
if @is_exec
exec_job
else
init_job(:synchronous)
self.load
end
end
end
noload ? path : @result
end
|
#running? ⇒ Boolean
116
117
118
|
# File 'lib/rbbt/rest/client/step.rb', line 116
def running?
! %w(done error aborted).include? status.to_s
end
|
#status ⇒ Object
39
40
41
42
43
44
45
|
# File 'lib/rbbt/rest/client/step.rb', line 39
def status
begin
info[:status]
ensure
@info = nil
end
end
|
#task_name ⇒ Object
25
26
27
|
# File 'lib/rbbt/rest/client/step.rb', line 25
def task_name
(Array === @url ? @url.first : @url).split("/")[-2]
end
|