Class: RExec::Task
- Inherits:
-
Object
- Object
- RExec::Task
- Defined in:
- lib/rexec/task.rb
Overview
Represents a running process, either a child process or a background/daemon process. Provides an easy high level interface for managing process life-cycle.
Instance Attribute Summary collapse
-
#error ⇒ Object
readonly
Standard error from the running task.
-
#input ⇒ Object
readonly
Standard input to the running task.
-
#output ⇒ Object
readonly
Standard output from the running task.
-
#pid ⇒ Object
readonly
The PID of the running task.
-
#result ⇒ Object
readonly
The status of the task after calling task.wait.
Class Method Summary collapse
-
.open(command, options = {}, &block) ⇒ Object
Open a process.
-
.running?(pid) ⇒ Boolean
Returns true if the given pid is a current process.
-
.spawn_child(&block) ⇒ Object
Very simple method to spawn a child process.
-
.spawn_daemon(&block) ⇒ Object
Very simple method to spawn a child daemon.
Instance Method Summary collapse
-
#close ⇒ Object
Close all connections to the child process.
-
#close_input ⇒ Object
Close input pipe to child process (if applicable).
-
#initialize(input, output, error, pid) ⇒ Task
constructor
A new instance of Task.
-
#kill(signal = "INT") ⇒ Object
Send a signal to the child process.
-
#running? ⇒ Boolean
Returns true if the current task is still running.
-
#stop ⇒ Object
Forcefully stop the child process.
-
#wait ⇒ Object
Wait for the child process to finish, return the exit status.
Constructor Details
#initialize(input, output, error, pid) ⇒ Task
Returns a new instance of Task.
341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/rexec/task.rb', line 341 def initialize(input, output, error, pid) @input = input @output = output @error = error @pid = pid @result = nil @status = :running @result_lock = Mutex.new @result_available = ConditionVariable.new end |
Instance Attribute Details
#error ⇒ Object (readonly)
Standard error from the running task.
361 362 363 |
# File 'lib/rexec/task.rb', line 361 def error @error end |
#input ⇒ Object (readonly)
Standard input to the running task.
355 356 357 |
# File 'lib/rexec/task.rb', line 355 def input @input end |
#output ⇒ Object (readonly)
Standard output from the running task.
358 359 360 |
# File 'lib/rexec/task.rb', line 358 def output @output end |
#pid ⇒ Object (readonly)
The PID of the running task.
364 365 366 |
# File 'lib/rexec/task.rb', line 364 def pid @pid end |
#result ⇒ Object (readonly)
The status of the task after calling task.wait.
367 368 369 |
# File 'lib/rexec/task.rb', line 367 def result @result end |
Class Method Details
.open(command, options = {}, &block) ⇒ Object
Open a process. Similar to IO.popen
, but provides a much more generic interface to stdin
, stdout
, stderr
and the pid
. We also attempt to tidy up as much as possible given some kind of error or exception. You may write to output
, and read from input
and error
.
Typical usage looks similar to IO.popen
: count = 0 result = Task.open([“ls”, “-la”], :passthrough => :err) do |task| count = task.output.read.split(/n/).size end puts “Count: #count” if result.exitstatus == 0
The basic command can simply be a string, and this will be passed to Kernel#exec which will perform shell expansion on the arguments.
If the command passed is an array, this will be executed without shell expansion.
If a Proc
(or anything that respond_to? :call) is provided, this will be executed in the child process. Here is an example of a long running background process:
daemon = Proc.new do # Long running process sleep(1000) end task = Task.open(daemon, :daemonize => true, :in => …, :out => …, :err => …) exit(0)
Options
:passthrough
,:in
,:out
,:err
-
The current process (e.g. ruby) has a set of existing pipes $stdin, $stdout and $stderr. These pipes can also be used by the child process. The passthrough option allows you to specify which pipes are retained from the parent process by the child.
Typically it is useful to passthrough $stderr, so that errors in the child process are printed out in the terminal of the parent process: Task.open(, :passthrough => :err) Task.open(, :passthrough => [:in, :out, :err]) Task.open(, :passthrough => :all)
It is also possible to redirect to files, which can be useful if you want to keep a a log file: Task.open(, :out => File.open(“output.log”))
The default behaviour is to create a new pipe, but any pipe (e.g. a network socket) could be used: Task.open(, :in => IO.pipe)
:daemonize
-
The process that is opened may be detached from the parent process. This allows the child process to exist even if the parent process exits. In this case, you will also probably want to specify the
:passthrough
option for log files: Task.open(, :daemonize => true, :in => File.open(“/dev/null”), :out => File.open(“/var/log/child.log”, “a”), :err => File.open(“/var/log/child.err”, “a”) ) :env
,:env!
-
Provide a environment which will be used by the child process. Use
:env
to update the exsting environment and:env!
to replace it completely. Task.open(, :env => => ‘bar’) :umask
-
Set the umask for the new process, as per
File.umask
. :chdir
-
Set the current working directory for the new process, as per
Dir.chdir
. :preflight
-
Similar to a proc based command, but executed before execing the given process. preflight = Proc.new do |command, options| # Setup some default state before exec the new process. end Task.open(, :preflight => preflight)
The options hash is passed directly so you can supply custom arguments to the preflight function.
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 |
# File 'lib/rexec/task.rb', line 278 def self.open(command, = {}, &block) cin, cout, cerr = () spawn = [:daemonize] ? :spawn_daemon : :spawn_child cid = self.send(spawn) do close_pipes(cin[WR], cout[RD], cerr[RD]) STDIN.reopen(cin[RD]) if cin[RD] STDOUT.reopen(cout[WR]) if cout[WR] STDERR.reopen(cerr[WR]) if cerr[WR] prepare_child_environment(command, ) if command.respond_to? :call command.call elsif Array === command # If command is a Pathname, we need to convert it to an absolute path if possible, # otherwise if it is relative it might cause problems. if command[0].respond_to? :realpath command[0] = command[0].realpath end # exec expects an array of Strings: command.collect! { |item| item.to_s } # Ensure that we DON'T use the shell for execution: command[0] = [command[0], command[0]] exec *command else if command.respond_to? :realpath command = command.realpath end exec command.to_s end end close_pipes(cin[RD], cout[WR], cerr[WR]) task = Task.new(cin[WR], cout[RD], cerr[RD], cid) if block_given? begin yield task # Close all input pipes if not done already. task.close_input # The task has stopped if task.wait returns correctly. return task.wait rescue Interrupt # If task.wait is interrupted, we should also interrupt the child process task.kill ensure # Print out any remaining data from @output or @error task.close end else return task end end |
.running?(pid) ⇒ Boolean
Returns true if the given pid is a current process
141 142 143 144 145 |
# File 'lib/rexec/task.rb', line 141 def self.running?(pid) gpid = Process.getpgid(pid) rescue nil return gpid != nil ? true : false end |
.spawn_child(&block) ⇒ Object
Very simple method to spawn a child process
spawn_child do puts “Hello from child!” end
187 188 189 190 191 192 193 194 195 |
# File 'lib/rexec/task.rb', line 187 def self.spawn_child(&block) pid = fork do yield exit!(0) end return pid end |
.spawn_daemon(&block) ⇒ Object
Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is not killed when the parent process finishes.
spawn_daemon do Dir.chdir(“/”) File.umask 0000 puts “Hello from daemon!” sleep(600) puts “This code will not quit when parent process finishes…” puts “…but $stdout might be closed unless you set it to a file.” end
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/rexec/task.rb', line 158 def self.spawn_daemon(&block) pid_pipe = IO.pipe fork do Process.setsid exit if fork # Send the pid back to the parent pid_pipe[RD].close pid_pipe[WR].write(Process.pid.to_s) pid_pipe[WR].close yield exit(0) end pid_pipe[WR].close pid = pid_pipe[RD].read pid_pipe[RD].close return pid.to_i end |
Instance Method Details
#close ⇒ Object
Close all connections to the child process
383 384 385 386 387 388 389 |
# File 'lib/rexec/task.rb', line 383 def close begin self.class.dump_pipes(@output, @error) ensure self.class.close_pipes(@input, @output, @error) end end |
#close_input ⇒ Object
Close input pipe to child process (if applicable)
392 393 394 |
# File 'lib/rexec/task.rb', line 392 def close_input @input.close if @input and !@input.closed? end |
#kill(signal = "INT") ⇒ Object
Send a signal to the child process
397 398 399 400 401 402 403 |
# File 'lib/rexec/task.rb', line 397 def kill(signal = "INT") if running? Process.kill(signal, @pid) else raise Errno::ECHILD end end |
#running? ⇒ Boolean
Returns true if the current task is still running
370 371 372 373 374 375 376 377 378 379 380 |
# File 'lib/rexec/task.rb', line 370 def running? if self.class.running?(@pid) # The pid still seems alive, check that it isn't some other process using the same pid... @result_lock.synchronize do # If we haven't waited for it yet, it must be either a running process or a zombie... return @status != :stopped end end return false end |
#stop ⇒ Object
Forcefully stop the child process.
446 447 448 449 450 451 |
# File 'lib/rexec/task.rb', line 446 def stop if running? close_input kill end end |
#wait ⇒ Object
Wait for the child process to finish, return the exit status. This function can be called from multiple threads.
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/rexec/task.rb', line 407 def wait begin_wait = false # Check to see if some other caller is already waiting on the result... @result_lock.synchronize do case @status when :waiting # If so, wait for the wait to finish... @result_available.wait(@result_lock) when :running # Else, mark that we should begin waiting... begin_wait = true @status = :waiting when :stopped return @result end end # If we should begin waiting (the first thread to wait)... if begin_wait begin # Wait for the result... _pid, @result = Process.wait2(@pid) end # The result is now available... @result_lock.synchronize do @status = :stopped end # Notify other threads... @result_available.broadcast() end # Return the result return @result end |