Class: StreamWorkflowTask
Constant Summary
collapse
- EOL =
"\r\n"
RbbtRESTHelpers::MEMORY_CACHE, RbbtRESTHelpers::PAGE_SIZE
Instance Attribute Summary
#javascript_resources, #plugin_resources, #sass_resources, #template_resources
#workflow_resources
Instance Method Summary
collapse
-
#_merge_chunks(sin, sout) ⇒ Object
-
#call(env) ⇒ Object
-
#copy_until_boundary(sin, sout, boundary) ⇒ Object
-
#do_stream(env) ⇒ Object
-
#get_inputs(content_type, stream) ⇒ Object
-
#initialize(app) ⇒ StreamWorkflowTask
constructor
A new instance of StreamWorkflowTask.
-
#merge_chunks(sin, sout, buffer) ⇒ Object
-
#parse_uri(env) ⇒ Object
-
#read_normal_inputs(io, boundary, stream_input) ⇒ Object
-
#run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) ⇒ Object
#add_GET_param, add_sass_load_path, #add_search_paths, #cache, #check_step, #consume_parameter, #css_resources, css_resources, #development?, #error_for, #file_or_text_area, #file_resources, file_resources, #filter, #find_all, #find_all_server_files, #fix_input, #form_input, #fragment, #glob_all, #glob_all_server_files, #hash2dl, #header, #html_tag, #input_label, javascript_resources, #json_resource, #link_css, #link_js, #load_tsv, load_tsv, #locate_css, #locate_file, #locate_javascript, #locate_sass, #locate_server_file, #locate_template, #log, #modal_fragment, #old_cache, #paginate, #param2boolean, #parse_page, #partial_render, #permalink, #prepare_input, #process_common_parameters, #production?, #progress, #record_css, #record_js, #recorded_css_files, #recorded_js_files, #remove_GET_param, #render, #render_partial, #render_sass, #reset_js_css, #resource, #reveal, sass_resources, save_tsv, #save_tsv, #serve_css, #serve_js, #sync_json_resources, #table, #table_value, #tabs, #tar_file, #template_render, template_resources, #traverse, #tsv2html, #tsv_process, #tsv_rows, #tsv_rows_full, #wait_on
#abort_job, #clean_job, #complete_input_set, #consume_task_parameters, #execution_type, #issue_job, #locate_workflow_template, #prepare_job_inputs, #recursive_clean_job, #show_exec_result, #show_result, #show_result_html, #stream_job, #type_of_export, #workflow_partial, #workflow_render, workflow_resources
Constructor Details
Returns a new instance of StreamWorkflowTask.
5
6
7
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 5
def initialize(app)
@app = app
end
|
Instance Method Details
#_merge_chunks(sin, sout) ⇒ Object
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 139
def _merge_chunks(sin, sout)
begin
while true
chunk_size_str = ""
stop = false
while chunk_size_str.strip.empty?
chunk_size_str = sin.gets
raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty?
chunk_size_str = "" if chunk_size_str.nil?
end
break if stop
size = chunk_size_str.strip.to_i(16)
break if size == 0
chunk = sin.read(size)
bound = sin.read(2)
raise "bound not right: #{ bound }" if bound != EOL
raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size
sout.write chunk
end
rescue Aborted
raise $!
rescue StandardError
Log.exception $!
raise $!
ensure
end
end
|
#call(env) ⇒ Object
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 215
def call(env)
if do_stream(env)
begin
client = env["rack.hijack"]
buffer = client.instance_variable_get('@buffer')
tcp_io = client.call
Log.low "Hijacking post data #{tcp_io}"
content_type = env["CONTENT_TYPE"]
encoding = env["HTTP_TRANSFER_ENCODING"]
if env["rack.input"]
tcp_merged_io = Misc.open_pipe do |sin|
rinput = env["rack.input"]
sin << rinput.instance_variable_get("@rbuf")
while c = rinput.gets
sin.puts c
end
end
else
if encoding == "chunked"
Log.low "Merging chunks #{tcp_io}"
tcp_merged_io = Misc.open_pipe do |sin|
begin
merge_chunks(tcp_io, sin, buffer);
rescue StandardError
ensure
begin
tcp_io.close_read;
rescue
end
end
end
else
tcp_merged_io = tcp_io
end
end
inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io)
workflow, task = parse_uri(env)
job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename)
job_url = File.join("/", workflow.to_s, task, job.name)
raise "Job aborted" if job.aborted?
raise job.messages.last if job.error?
out_stream = TSV.get_stream job
begin
Log.high "Write response #{Misc.fingerprint tcp_io} "
tcp_io.write "HTTP/1.1 200\r\n"
tcp_io.write "Connection: close\r\n"
tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
tcp_io.write "\r\n"
Log.high "Comsuming response #{Misc.fingerprint tcp_io}"
begin
while l = out_stream.readpartial(2048)
tcp_io.write l
end
rescue EOFError
end
Log.high "Comsumed response #{Misc.fingerprint tcp_io}"
out_stream.join if out_stream.respond_to? :join
rescue Exception
Log.exception $!
raise $!
end if out_stream
tcp_io.close_write unless tcp_io.closed?
Log.high "Closed io #{tcp_io}"
[-1, {}, []]
rescue Exception
Log.exception $!
job.exception $! if job
tcp_io.write "HTTP/1.1 500\r\n"
tcp_io.write "Connection: close\r\n"
tcp_io.write "\r\n"
tcp_io.close_write
raise $!
end
else
Log.low "NOT Hijacking post data"
@app.call(env)
end
end
|
#copy_until_boundary(sin, sout, boundary) ⇒ Object
58
59
60
61
62
63
64
65
66
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 58
def copy_until_boundary(sin, sout, boundary)
last_line = nil
while line = sin.gets
break if line.include? boundary
sout.write last_line
last_line = line
end
sout.write last_line.strip unless last_line.nil? or last_line == EOL
end
|
#do_stream(env) ⇒ Object
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 197
def do_stream(env)
uri = env["REQUEST_URI"]
post = env["REQUEST_METHOD"]
return false unless post == "POST"
hijack = !!env["rack.hijack"]
return false unless hijack
content_type = env["CONTENT_TYPE"]
return false unless content_type and content_type.include? "Rbbt_Param_Stream"
encoding = env["HTTP_TRANSFER_ENCODING"]
return false unless encoding.nil? or encoding == "chunked"
true
end
|
68
69
70
71
72
73
74
75
76
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 68
def get_inputs(content_type, stream)
boundary = content_type.match(/boundary=([^\s;]*)/)[1]
stream_input = content_type.match(/stream=([^\s;]*)/)[1]
inputs, filename = read_normal_inputs(stream, boundary, stream_input)
IndiferentHash.setup(inputs)
[inputs, stream_input, filename, stream, boundary]
end
|
#merge_chunks(sin, sout, buffer) ⇒ Object
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 177
def merge_chunks(sin, sout, buffer)
if buffer.nil?
_merge_chunks(sin, sout)
else
ssin = Misc.open_pipe do |s|
begin
s << buffer
while c = sin.readpartial(Misc::BLOCK_SIZE)
s << c
end
rescue Aborted, IOError
rescue Exception
ensure
s.close
end
end
_merge_chunks(ssin, sout)
end
end
|
#parse_uri(env) ⇒ Object
11
12
13
14
15
16
17
18
19
20
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 11
def parse_uri(env)
uri = env["REQUEST_URI"]
_n, workflow, task = uri.split("/")
workflow = begin
Kernel.const_get(workflow)
rescue
raise "Could not accept task for workflow: #{ workflow }"
end
[workflow, task]
end
|
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 22
def read_normal_inputs(io, boundary, stream_input)
inputs = {}
input_name = nil
variable_chunk = nil
filename = nil
while line = io.gets
line.chomp!
chunk_start = line == "--" + boundary
if chunk_start
if input_name
inputs[input_name] = variable_chunk
end
content_start = false
elsif content_start
if variable_chunk.empty?
variable_chunk << line
else
variable_chunk << "\n" << line
end
elsif line =~ /^Content.* name="([^\s;"]*)"/
input_name = $1
filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/
elsif line.empty?
variable_chunk = ""
break if input_name == stream_input
content_start = true
end
end
[inputs, filename]
end
|
#run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
# File 'lib/rbbt/rest/workflow/stream_task.rb', line 78
def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
name = inputs.delete "jobname"
task_parameters = prepare_job_inputs(workflow, task, inputs)
IndiferentHash.setup task_parameters
Misc.add_stream_filename(stream, filename) if filename
clean_stream = Misc.open_pipe do |sin|
begin
copy_until_boundary(stream, sin, boundary)
rescue
Log.exception $!
end
end
ConcurrentStream.setup(clean_stream, :filename => filename)
task_parameters[stream_input.to_sym] = clean_stream
task = task.to_sym
Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}"
job = workflow.job(task, name, task_parameters)
job.clean if job.aborted?
execution_type = type_of_export(workflow, task)
execution_type = "exec" if inputs["_cache_type"] == 'exec'
begin
case execution_type.to_s
when "exec", nil
job.exec(:stream)
when "sync", "synchronous", "async", "asynchronous"
if job.done? or job.started?
done_consumer = Thread.new do
Misc.consume_stream(clean_stream)
end
job.join unless job.done?
else
job.run(:stream)
end
else
raise "Unknown execution_type: #{Misc.inspect execution_type}"
end
rescue Aborted, Interrupt
job.abort
stream.write "HTTP/1.1 500\r\n"
stream.close_write
rescue Exception
job.exception $!
stream.write "HTTP/1.1 500\r\n"
stream.close_write
end
job
end
|