Class: WorkflowRESTClient
- Inherits:
-
Object
- Object
- WorkflowRESTClient
show all
- Includes:
- Workflow
- Defined in:
- lib/rbbt/rest/client.rb,
lib/rbbt/rest/client/get.rb,
lib/rbbt/rest/client/step.rb,
lib/rbbt/rest/client/adaptor.rb
Defined Under Namespace
Classes: RemoteStep
Constant Summary
Constants included
from Workflow
Workflow::DEBUG_JOB_HASH, Workflow::DEFAULT_NAME, Workflow::FORGET_DEP_TASKS, Workflow::LOAD_STEP_CACHE, Workflow::STEP_CACHE, Workflow::TAG
Instance Attribute Summary collapse
Attributes included from Workflow
#description, #example_dir, #helpers, #last_task, #libdir, #load_step_cache, #remote_tasks, #step_cache, #task_description, #workdir
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Workflow
#SOPT_str, #__job, __load_step, #_job, _load_step, #add_remote_tasks, #all_exports, #assign_dep_inputs, #dep, #dep_task, #desc, #doc, doc_parse_chunks, doc_parse_first_line, doc_parse_up_to, #documentation_markdown, #example, #example_inputs, #example_step, #examples, #export_asynchronous, #export_exec, #export_stream, #export_synchronous, extended, #extension, #fast_load_id, fast_load_step, #get_SOPT, get_SOPT, #get_job_step, #helper, #id_for, #import, #import_task, init_remote_tasks, installed_workflows, #jobs, #load_documentation, load_inputs, #load_name, load_remote_tasks, load_step, #load_step, load_step_cache, load_workflow_file, load_workflow_libdir, #local_persist_setup, #local_workdir_setup, local_workflow_filename, #log, #make_local, #override_dependencies, parse_workflow_doc, process_remote_tasks, #real_dependencies, #rec_dependencies, #rec_input_defaults, #rec_input_descriptions, #rec_input_options, #rec_input_types, #rec_input_use, #rec_inputs, relocate, relocate_array, relocate_dependency, require_local_workflow, require_remote_workflow, require_workflow, resolve_locals, #returns, #set_step_dependencies, #setup_override_dependency, #step_module, #step_path, #task, #task_exports, #task_for, #task_from_dep, transplant, #unexport, #with_workdir, workdir, workdir=, workflow_for
#input
Constructor Details
Returns a new instance of WorkflowRESTClient.
16
17
18
19
20
|
# File 'lib/rbbt/rest/client.rb', line 16
def initialize(url, name)
Log.debug{ "Loading remote workflow #{ name }: #{ url }" }
@url, @name = url, name
init_remote_tasks
end
|
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
14
15
16
|
# File 'lib/rbbt/rest/client.rb', line 14
def asynchronous_exports
@asynchronous_exports
end
|
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
14
15
16
|
# File 'lib/rbbt/rest/client.rb', line 14
def exec_exports
@exec_exports
end
|
#name ⇒ Object
Returns the value of attribute name.
14
15
16
|
# File 'lib/rbbt/rest/client.rb', line 14
def name
@name
end
|
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
14
15
16
|
# File 'lib/rbbt/rest/client.rb', line 14
def stream_exports
@stream_exports
end
|
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
14
15
16
|
# File 'lib/rbbt/rest/client.rb', line 14
def synchronous_exports
@synchronous_exports
end
|
#url ⇒ Object
Returns the value of attribute url.
14
15
16
|
# File 'lib/rbbt/rest/client.rb', line 14
def url
@url
end
|
Class Method Details
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
# File 'lib/rbbt/rest/client/step.rb', line 3
def self.__prepare_inputs_for_restclient(inputs)
inputs.each do |k,v|
if v.respond_to? :path and not v.respond_to? :original_filename
class << v
def original_filename
File.expand_path(path)
end
end
end
if Array === v and v.empty?
inputs[k] = "EMPTY_ARRAY"
end
end
end
|
.capture_exception ⇒ Object
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
# File 'lib/rbbt/rest/client/get.rb', line 37
def self.capture_exception
begin
yield
rescue Exception => e
raise e unless e.respond_to? :response
begin
klass, message = e.response.to_s.split " => "
klass = Kernel.const_get klass
raise klass.new message
rescue
raise e
end
raise $!
end
end
|
.clean_url(url, params = {}) ⇒ Object
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/rbbt/rest/client/get.rb', line 65
def self.clean_url(url, params = {})
params = params.merge({ :_format => 'json', :update => 'clean' })
params = fix_params params
res = capture_exception do
Misc.insist(2, 0.5) do
Log.debug{ "RestClient clean: #{ url } - #{Misc.fingerprint params}" }
res = RestClient.get(self.encode(url), :params => params)
raise TryAgain if res.code == 202
res
end
end
res
end
|
.encode(url) ⇒ Object
2
3
4
5
6
7
8
9
|
# File 'lib/rbbt/rest/client/get.rb', line 2
def self.encode(url)
begin
URI.encode(url)
rescue
Log.warn $!.message
url
end
end
|
.fix_hash(hash, fix_values = false) ⇒ Object
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/rbbt/rest/client/get.rb', line 11
def self.fix_hash(hash, fix_values = false)
fixed = {}
hash.each do |key, value|
fixed[key.to_sym] = case value
when TrueClass
value
when FalseClass
value
when Hash
fix_hash(value)
when (fix_values and String )
value.to_sym
when IO
value.read
when TSV::Dumper
value.stream
when Step
stream = get_stream(value)
stream || value.load
else
value
end
end
fixed
end
|
.fix_params(params) ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/rbbt/rest/client/get.rb', line 53
def self.fix_params(params)
new_params = {}
params.each do |k,v|
if Array === v and v.empty?
new_params[k] = "EMPTY_ARRAY"
else
new_params[k] = v
end
end
new_params
end
|
.get_json(url, params = {}) ⇒ Object
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
# File 'lib/rbbt/rest/client/get.rb', line 94
def self.get_json(url, params = {})
Log.debug{ "RestClient get_json: #{ url } - #{Misc.fingerprint params }" }
params = params.merge({ :_format => 'json' })
params = fix_params params
res = capture_exception do
Misc.insist(2, 0.5) do
RestClient.get(self.encode(url), :params => params)
end
end
begin
JSON.parse(res)
rescue
res
end
end
|
.get_raw(url, params = {}) ⇒ Object
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/rbbt/rest/client/get.rb', line 79
def self.get_raw(url, params = {})
params = params.merge({ :_format => 'raw' })
params = fix_params params
res = capture_exception do
Misc.insist(2, 0.5) do
raise "No url" if url.nil?
Log.debug{ "RestClient get_raw: #{ url } - #{Misc.fingerprint params}" }
res = RestClient.get(self.encode(url), :params => params)
raise TryAgain if res.code == 202
res.to_s
end
end
res
end
|
.post_jobname(url, params = {}) ⇒ Object
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/rbbt/rest/client/get.rb', line 112
def self.post_jobname(url, params = {})
Log.debug{ "RestClient post_jobname: #{ url } - #{Misc.fingerprint params}" }
params = params.merge({ :_format => 'jobname' })
params = fix_params params
WorkflowRESTClient.__prepare_inputs_for_restclient(params)
name = capture_exception do
RestClient.post(self.encode(url), params)
end
Log.debug{ "RestClient jobname returned for #{ url } - #{Misc.fingerprint params}: #{name}" }
name
end
|
.post_json(url, params = {}) ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/rbbt/rest/client/get.rb', line 127
def self.post_json(url, params = {})
if url =~ /_cache_type=:exec/
JSON.parse(Open.open(url, :nocache => true))
else
params = params.merge({ :_format => 'json' })
params = fix_params params
res = capture_exception do
RestClient.post(self.encode(url), params)
end
begin
JSON.parse(res)
rescue
res
end
end
end
|
Instance Method Details
#exported_tasks ⇒ Object
29
30
31
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 29
def exported_tasks
(@asynchronous_exports + @synchronous_exports + @exec_exports).compact.flatten
end
|
#init_remote_tasks ⇒ Object
59
60
61
62
63
64
65
66
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 59
def init_remote_tasks
task_exports = WorkflowRESTClient.get_json(url)
@asynchronous_exports = task_exports["asynchronous"].collect{|task| task.to_sym }
@synchronous_exports = task_exports["synchronous"].collect{|task| task.to_sym }
@exec_exports = task_exports["exec"].collect{|task| task.to_sym }
@stream_exports = task_exports["stream"].collect{|task| task.to_sym }
@can_stream = task_exports["can_stream"]
end
|
#job(task, name, inputs) ⇒ Object
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/rbbt/rest/client.rb', line 26
def job(task, name, inputs)
task_info = task_info(task)
fixed_inputs = {}
input_types = task_info[:input_types]
inputs.each do |k,v|
k = k.to_sym
if TSV === v
fixed_inputs[k] = v.to_s
else
next if input_types[k].nil?
case input_types[k].to_sym
when :tsv, :array, :file, :text
fixed_inputs[k] = (String === v and Open.exists?(v)) ? Open.open(v) : v
else
fixed_inputs[k] = v
end
end
end
stream_input = @can_stream ? task_info(task)[:input_options].select{|k,o| o[:stream] }.collect{|k,o| k }.first : nil
RemoteStep.new(url, task, name, fixed_inputs, task_info[:result_type], task_info[:result_description], @exec_exports.include?(task), @stream_exports.include?(task), stream_input)
end
|
#load_id(id) ⇒ Object
51
52
53
54
55
56
57
58
|
# File 'lib/rbbt/rest/client.rb', line 51
def load_id(id)
task, name = id.split("/")
step = RemoteStep.new url, task, nil
step.name = name
step.result_type = task_info(task)[:result_type]
step.result_description = task_info(task)[:result_description]
step
end
|
#load_tasks ⇒ Object
44
45
46
47
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 44
def load_tasks
exported_tasks.each{|name| tasks[name]}
nil
end
|
#task_dependencies ⇒ Object
49
50
51
52
53
54
55
56
57
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 49
def task_dependencies
@task_dependencies ||= Hash.new do |hash,task|
hash[task] = if exported_tasks.include? task
WorkflowRESTClient.get_json(File.join(url, task.to_s, 'dependencies'))
else
[]
end
end
end
|
#task_info(task) ⇒ Object
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 11
def task_info(task)
@task_info ||= {}
@task_info[task]
if @task_info[task].nil?
task_info = WorkflowRESTClient.get_json(File.join(url, task.to_s, 'info'))
task_info = WorkflowRESTClient.fix_hash(task_info)
task_info[:result_type] = task_info[:result_type].to_sym
task_info[:export] = task_info[:export].to_sym
task_info[:input_types] = WorkflowRESTClient.fix_hash(task_info[:input_types], true)
task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym }
@task_info[task] = task_info
end
@task_info[task]
end
|
#tasks ⇒ Object
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 33
def tasks
@tasks ||= Hash.new do |hash,task_name|
info = task_info(task_name)
task = Task.setup info do |*args|
raise "This is a remote task"
end
task.name = task_name.to_sym
hash[task_name] = task
end
end
|
#to_s ⇒ Object
22
23
24
|
# File 'lib/rbbt/rest/client.rb', line 22
def to_s
name
end
|
#workflow_description ⇒ Object
3
4
5
|
# File 'lib/rbbt/rest/client/adaptor.rb', line 3
def workflow_description
WorkflowRESTClient.get_raw(File.join(url, 'description'))
end
|