Module: Brush::Pipeline
Defined Under Namespace
Modules: POSIX, Win32 Classes: ProcessIOResult, SysInfo
Constant Summary collapse
- PARENT_PIPES =
{}
Instance Method Summary collapse
- #child_pipe ⇒ Object
- #duck_type_status_object(object, status_or_pid, status_integer = nil) ⇒ Object
- #each_path_element ⇒ Object
- #generic_pipe(p_pipe, ch_pipe) ⇒ Object
- #input_pipe(&block) ⇒ Object
- #mark_parent_pipe(pipe) ⇒ Object
- #output_pipe(&block) ⇒ Object
-
#pipeline(*elements) ⇒ Object
Create and execute a pipeline consisting of commands.
-
#process_io(io_sym, target, original_stdfiles, close_pipes) ⇒ Object
File or IO, String (empty), String (filename), String (data), StringIO, Integer, :stdout, :stderr, :null or nil, :zero, Symbol (other), Array (new command) —- supported: File or IO, StringIO, Integer, :stdout, :stderr, :null or nil, :zero, Array (new command) —- future: String (empty), String (filename), String (data), Symbol (other),.
- #sys(*argv) ⇒ Object
-
#sys_start(*argv) ⇒ Object
Options to each pipeline element include: :stdin — file, pipe, buffer, or :console to feed into the first element of the pipeline.
Methods included from POSIX
#create_process, #find_in_path, #sys_wait
Methods included from Win32
#create_process, #each_pathext_element, #find_in_path, #make_handle_inheritable, #sys_wait
Instance Method Details
#child_pipe ⇒ Object
288 289 290 291 292 |
# File 'lib/brush/pipeline.rb', line 288 def child_pipe r, w = *IO.pipe sysinfo = yield r, w ProcessIOResult.new(w, r, sysinfo.threads, sysinfo.process_infos) end |
#duck_type_status_object(object, status_or_pid, status_integer = nil) ⇒ Object
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/brush/pipeline.rb', line 302 def duck_type_status_object (object, status_or_pid, status_integer = nil) if status_integer.nil? and status_or_pid.respond_to?(:success?) class << object # Act like the Process::Status @status. (Process::Status.instance_methods - self.instance_methods).each do |m| eval("def #{m} (*args); @status.#{m}(*args); end") end end object.instance_variable_set(:@status, status_or_pid) else class << object end object.instance_variable_set(:@status, status_or_pid) class << object attr_reader :to_i, :pid # We have no idea if we exited normally, coredumped, etc. Just # pretend it's normal. def coredump?; false; end def exited?; true; end def signaled?; false; end def stopped?; false; end def stopsig; nil; end def success?; @to_i.zero?; end def termsig; nil; end alias exitstatus to_i # Act like the Fixnum in @to_i. (Fixnum.instance_methods - self.instance_methods).each do |m| eval("def #{m} (*args); @to_i.#{m}(*args); end") end end object.instance_variable_set(:@to_i, status_integer) object.instance_variable_set(:@pid, status_or_pid) end object end |
#each_path_element ⇒ Object
295 296 297 298 299 |
# File 'lib/brush/pipeline.rb', line 295 def each_path_element ENV['PATH'].split(File::PATH_SEPARATOR).each do |dir| yield File.(dir) end end |
#generic_pipe(p_pipe, ch_pipe) ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/brush/pipeline.rb', line 255 def generic_pipe (p_pipe, ch_pipe) mark_parent_pipe(p_pipe) t = Thread.new do begin yield p_pipe rescue Exception ensure p_pipe.close end end ProcessIOResult.new(ch_pipe, nil, [t]) end |
#input_pipe(&block) ⇒ Object
280 281 282 |
# File 'lib/brush/pipeline.rb', line 280 def input_pipe (&block) generic_pipe *IO.pipe.reverse, &block end |
#mark_parent_pipe(pipe) ⇒ Object
268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/brush/pipeline.rb', line 268 def mark_parent_pipe (pipe) class << pipe def close super ensure Brush::Pipeline::PARENT_PIPES.delete(self) end end Brush::Pipeline::PARENT_PIPES[pipe] = true end |
#output_pipe(&block) ⇒ Object
284 285 286 |
# File 'lib/brush/pipeline.rb', line 284 def output_pipe (&block) generic_pipe *IO.pipe, &block end |
#pipeline(*elements) ⇒ Object
Create and execute a pipeline consisting of commands. Each element of the pipeline is an array of command arguments and options for that element of the pipeline.
Options to each pipeline element include:
:executable — specifies an alternative binary to run, instead of
using the value for argv[0].
:cd — change into this directory for this program.
---- probable future options
:env — pass an alternative set of environment variables to the
process.
:stderr — file, pipe, or buffer to collect error information.
:as_user — Array specifying user and credentials to run as.
Options to the entire pipeline include:
:stdin — file, pipe, or buffer to feed into the first element of
the pipeline.
:stdout — file, pipe, or buffer to collect the output of the
last element of the pipeline.
The return value is an Array reporting the success or failure of each element of the pipeline. Each element of the array is an Object that emulates a Process::Status object.
Example:
extracted_files = String.new
Brush::Pipeline.pipeline(
['gzip', '-cd', 'filename.tar.gz', :cd => 'Downloads'],
['tar', 'xvf', '-', :cd => 'Extractions'],
:stdout => extracted_files)
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/brush/pipeline.rb', line 43 def pipeline (*elements) = { :stdin => $stdin, :stdout => $stdout } if elements[-1].respond_to?(:has_key?) .merge!(elements.pop) end if elements.size == 0 raise "invalid use of pipeline: no commands given" end # Don't modify the originals, and make sure we have an options hash # for each element. elements = elements.collect do |argv| argv = argv.dup argv.push(Hash.new) if not argv[-1].respond_to?(:has_key?) argv end # Feed the input and the output elements[0][-1][:stdin] = [:stdin] elements[-1][-1][:stdout] = [:stdout] # Build up the structure for the call to #sys. elements.each_with_index do |argv, index| argv[-1][:stdout] = elements[index + 1] if index < elements.size - 1 argv[-1][:stderr] = [:stderr] if .has_key?(:stderr) end sys(*elements[0]) end |
#process_io(io_sym, target, original_stdfiles, close_pipes) ⇒ Object
File or IO, String (empty), String (filename), String (data), StringIO, Integer, :stdout, :stderr, :null or nil, :zero, Symbol (other), Array (new command) —- supported: File or IO, StringIO, Integer, :stdout, :stderr, :null or nil, :zero, Array (new command) —- future: String (empty), String (filename), String (data), Symbol (other),
Returns an array of stuff:
[IO object, thread or threads, pipe or pipes, process info objects]
The IO object is the processed IO object based on the input IO object (taret
), which may not have actually been an IO object. Threads returned, if any, need to be joined after the process terminates. Pipes returned, if any, need to be closed after the process terminates. Process info objects, if any, refer to other processes running in the pipeline that this call to process_io created.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/brush/pipeline.rb', line 195 def process_io (io_sym, target, original_stdfiles, close_pipes) # Firstly, any File or IO value can be returned as it is. # We will duck-type recognize File and IO objects if they respond # to :fileno and the result of calling #fileno is not nil. if target.respond_to?(:fileno) and not target.fileno.nil? return ProcessIOResult.new(target) end # Integer (Fixnum in particular) arguments represent file # descriptors to attach this IO to. return ProcessIOResult.new(IO.new(target)) if target.is_a? Fixnum # Handle special symbol values for :stdin. Valid symbols are # :null and :zero. Using :null is the same as +nil+ (no input), # and using :zero is like sending an endless stream of null # characters to the process. if io_sym == :stdin if target.nil? or target == :null return input_pipe {} elsif target == :zero return input_pipe { |w| w.syswrite("\x00" * 1024) while true } elsif target.respond_to?(:sysread) # "fake" IO and StringIO return input_pipe do |w| data = nil; w.syswrite(data) while data = target.sysread(1024) end else raise "Invalid input object in Brush#sys" end # Handle special symbol values for :stdout and :stderr. Valid # symbols are :null, :zero, :stdout, and :stderr. The symbols # :null and :zero mean the output is thrown away. :stdout means # this output goes where standard output should go and :stderr # means this output goes where standard error should go. else # io_sym is :stdout or :stderr if target.nil? or target == :null or target == :zero return output_pipe { |r| r.sysread(1024) while true } elsif target == :stdout return process_io(io_sym, original_stdfiles[0], nil, close_pipes) elsif target == :stderr return process_io(io_sym, original_stdfiles[1], nil, close_pipes) elsif target.respond_to?(:syswrite) # "fake" IO and StringIO return output_pipe do |r| data = nil; target.syswrite(data) while data = r.sysread(1024) end elsif target.is_a?(Array) # pipeline return child_pipe do |r, w| argv = target.dup argv.push(Hash.new) if not argv[-1].respond_to?(:has_key?) argv[-1].merge!(:stdin => r, :close => [w] + close_pipes) sys_start(*argv) end else raise "Invalid output object in Brush#sys" end end end |
#sys(*argv) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/brush/pipeline.rb', line 155 def sys (*argv) sysinfo = sys_start(*argv) overall_result = nil results = sysinfo.process_infos.collect do |pi| status = sys_wait(pi) overall_result = status if overall_result.nil? and not status.success? status end sysinfo.threads.each { |t| t.join } overall_result = results[-1] if overall_result.nil? duck_type_status_object(results, overall_result) end |
#sys_start(*argv) ⇒ Object
Options to each pipeline element include:
:stdin — file, pipe, buffer, or :console to feed into the first
element of the pipeline.
:stdout — file, pipe, or buffer to collect the output of the
last element of the pipeline.
:stderr — file, pipe, or buffer to collect error information.
:executable — specifies an alternative binary to run, instead of
using the value for argv[0].
:cd — change into this directory for this program.
:close — File or IO.fileno values to close after fork() or set
un-inheritable prior to calling ProcessCreate().
---- probable future options
:keep — File or IO.fileno values to keep open in child.
:timeout — terminate after a given time.
:env — pass an alternative set of environment variables.
:as_user — Array specifying user and credentials to run as.
process.
Returns an array of arrays:
[process objects, threads, pipes]
Process objects contain the pid and any relevent process and thread handles. Threads returned need to be joined to guarentee their input or output is completely processed after the program terminates. Pipes returned need to be closed after the program terminates.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/brush/pipeline.rb', line 111 def sys_start (*argv) = { :stdin => $stdin, :stdout => $stdout, :stderr => $stderr, :executable => argv[0], :cd => '.', :close => [] } if argv[-1].respond_to?(:has_key?) .merge!(argv.pop) end [:executable] = find_in_path([:executable]) original_stdfiles = [:stdout, :stderr].collect do |io_sym| [io_sym] end upper_child_pipes = [] # pipes for children up the pipeline lower_child_pipes = [] # pipes for children down the pipeline threads = [] # threads handling special needs I/O process_infos = [] # info for children down the pipeline [:stdin, :stdout, :stderr].each do |io_sym| pior = process_io(io_sym, [io_sym], original_stdfiles, upper_child_pipes + lower_child_pipes + [:close]) [io_sym] = pior.io upper_child_pipes << pior.io if pior.threads lower_child_pipes << pior.pipe if pior.pipe threads.push *pior.threads if pior.threads process_infos.push *pior.process_infos if pior.process_infos end process_infos.unshift( create_process(argv, , lower_child_pipes + [:close])) upper_child_pipes.each { |io| io.close } lower_child_pipes.each { |io| io.close } SysInfo.new(process_infos, threads) end |