Class: RExec::Task
- Inherits:
-
Object
- Object
- RExec::Task
- Defined in:
- lib/rexec/task.rb
Overview
Represents a running process, either a child process or a background/daemon process. Provides an easy high level interface for managing process life-cycle.
Instance Attribute Summary collapse
-
#error ⇒ Object
readonly
Standard error from the running task.
-
#input ⇒ Object
readonly
Standard input to the running task.
-
#output ⇒ Object
readonly
Standard output from the running task.
-
#pid ⇒ Object
readonly
The PID of the running task.
-
#result ⇒ Object
readonly
The status of the task after calling task.wait.
Class Method Summary collapse
-
.open(command, options = {}, &block) ⇒ Object
Open a process.
-
.running?(pid) ⇒ Boolean
Returns true if the given pid is a current process.
-
.spawn_child(&block) ⇒ Object
Very simple method to spawn a child process.
-
.spawn_daemon(&block) ⇒ Object
Very simple method to spawn a child daemon.
Instance Method Summary collapse
-
#close ⇒ Object
Close all connections to the child process.
-
#close_input ⇒ Object
Close input pipe to child process (if applicable).
-
#initialize(input, output, error, pid) ⇒ Task
constructor
:nodoc:.
-
#kill(signal = "INT") ⇒ Object
Send a signal to the child process.
-
#running? ⇒ Boolean
Returns true if the current task is still running.
-
#stop ⇒ Object
Forcefully stop the child process.
-
#wait ⇒ Object
Wait for the child process to finish, return the exit status.
Constructor Details
#initialize(input, output, error, pid) ⇒ Task
:nodoc:
323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/rexec/task.rb', line 323 def initialize(input, output, error, pid) # :nodoc: @input = input @output = output @error = error @pid = pid @result = nil @status = :running @result_lock = Mutex.new @result_available = ConditionVariable.new end |
Instance Attribute Details
#error ⇒ Object (readonly)
Standard error from the running task.
343 344 345 |
# File 'lib/rexec/task.rb', line 343 def error @error end |
#input ⇒ Object (readonly)
Standard input to the running task.
337 338 339 |
# File 'lib/rexec/task.rb', line 337 def input @input end |
#output ⇒ Object (readonly)
Standard output from the running task.
340 341 342 |
# File 'lib/rexec/task.rb', line 340 def output @output end |
#pid ⇒ Object (readonly)
The PID of the running task.
346 347 348 |
# File 'lib/rexec/task.rb', line 346 def pid @pid end |
#result ⇒ Object (readonly)
The status of the task after calling task.wait.
349 350 351 |
# File 'lib/rexec/task.rb', line 349 def result @result end |
Class Method Details
.open(command, options = {}, &block) ⇒ Object
Open a process. Similar to IO.popen
, but provides a much more generic interface to stdin
, stdout
, stderr
and the pid
. We also attempt to tidy up as much as possible given some kind of error or exception. You may write to output
, and read from input
and error
.
Typical usage looks similar to IO.popen
: count = 0 result = Task.open([“ls”, “-la”], :passthrough => :err) do |task| count = task.output.read.split(/n/).size end puts “Count: #count” if result.exitstatus == 0
The basic command can simply be a string, and this will be passed to Kernel#exec which will perform shell expansion on the arguments.
If the command passed is an array, this will be executed without shell expansion.
If a Proc
(or anything that respond_to? :call) is provided, this will be executed in the child process. Here is an example of a long running background process:
daemon = Proc.new do # Long running process sleep(1000) end task = Task.open(daemon, :daemonize => true, :in => …, :out => …, :err => …) exit(0)
Options
:passthrough
,:in
,:out
,:err
-
The current process (e.g. ruby) has a set of existing pipes $stdin, $stdout and $stderr. These pipes can also be used by the child process. The passthrough option allows you to specify which pipes are retained from the parent process by the child.
Typically it is useful to passthrough $stderr, so that errors in the child process are printed out in the terminal of the parent process: Task.open(, :passthrough => :err) Task.open(, :passthrough => [:in, :out, :err]) Task.open(, :passthrough => :all)
It is also possible to redirect to files, which can be useful if you want to keep a a log file: Task.open(, :out => File.open(“output.log”))
The default behaviour is to create a new pipe, but any pipe (e.g. a network socket) could be used: Task.open(, :in => IO.pipe)
:daemonize
-
The process that is opened may be detached from the parent process. This allows the child process to exist even if the parent process exits. In this case, you will also probably want to specify the
:passthrough
option for log files: Task.open(, :daemonize => true, :in => File.open(“/dev/null”), :out => File.open(“/var/log/child.log”, “a”), :err => File.open(“/var/log/child.err”, “a”) ) :env
,:env!
-
Provide a environment which will be used by the child process. Use
:env
to update the exsting environment and:env!
to replace it completely. Task.open(, :env => => ‘bar’) :umask
-
Set the umask for the new process, as per
File.umask
. :chdir
-
Set the current working directory for the new process, as per
Dir.chdir
. :preflight
-
Similar to a proc based command, but executed before execing the given process. preflight = Proc.new do |command, options| # Setup some default state before exec the new process. end Task.open(, :preflight => preflight)
The options hash is passed directly so you can supply custom arguments to the preflight function.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
# File 'lib/rexec/task.rb', line 257 def self.open(command, = {}, &block) cin, cout, cerr = () spawn = [:daemonize] ? :spawn_daemon : :spawn_child cid = self.send(spawn) do close_pipes(cin[WR], cout[RD], cerr[RD]) STDIN.reopen(cin[RD]) if cin[RD] STDOUT.reopen(cout[WR]) if cout[WR] STDERR.reopen(cerr[WR]) if cerr[WR] if [:env!] ENV.clear ENV.update([:env!]) elsif [:env] ENV.update([:env]) end if [:umask] File.umask([:umask]) end if [:chdir] Dir.chdir([:chdir]) end if [:preflight] preflight.call(command, ) end if command.respond_to? :call command.call elsif Array === command # If command is a Pathname, we need to convert it to an absolute path if possible, # otherwise if it is relative it might cause problems. if command[0].respond_to? :realpath command[0] = command[0].realpath end exec *command else if command.respond_to? :realpath command = command.realpath end exec command.to_s end end close_pipes(cin[RD], cout[WR], cerr[WR]) task = Task.new(cin[WR], cout[RD], cerr[RD], cid) if block_given? begin yield task task.close_input return task.wait ensure task.close end else return task end end |
.running?(pid) ⇒ Boolean
Returns true if the given pid is a current process
120 121 122 123 124 |
# File 'lib/rexec/task.rb', line 120 def self.running?(pid) gpid = Process.getpgid(pid) rescue nil return gpid != nil ? true : false end |
.spawn_child(&block) ⇒ Object
Very simple method to spawn a child process
spawn_child do puts “Hello from child!” end
166 167 168 169 170 171 172 173 174 |
# File 'lib/rexec/task.rb', line 166 def self.spawn_child(&block) pid = fork do yield exit!(0) end return pid end |
.spawn_daemon(&block) ⇒ Object
Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is not killed when the parent process finishes.
spawn_daemon do Dir.chdir(“/”) File.umask 0000 puts “Hello from daemon!” sleep(600) puts “This code will not quit when parent process finishes…” puts “…but $stdout might be closed unless you set it to a file.” end
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/rexec/task.rb', line 137 def self.spawn_daemon(&block) pid_pipe = IO.pipe fork do Process.setsid exit if fork # Send the pid back to the parent pid_pipe[RD].close pid_pipe[WR].write(Process.pid.to_s) pid_pipe[WR].close yield exit(0) end pid_pipe[WR].close pid = pid_pipe[RD].read pid_pipe[RD].close return pid.to_i end |
Instance Method Details
#close ⇒ Object
Close all connections to the child process
365 366 367 368 369 370 371 |
# File 'lib/rexec/task.rb', line 365 def close begin self.class.dump_pipes(@output, @error) ensure self.class.close_pipes(@input, @output, @error) end end |
#close_input ⇒ Object
Close input pipe to child process (if applicable)
374 375 376 |
# File 'lib/rexec/task.rb', line 374 def close_input @input.close if @input and !@input.closed? end |
#kill(signal = "INT") ⇒ Object
Send a signal to the child process
379 380 381 382 383 384 385 |
# File 'lib/rexec/task.rb', line 379 def kill(signal = "INT") if running? Process.kill(signal, @pid) else raise Errno::ECHILD end end |
#running? ⇒ Boolean
Returns true if the current task is still running
352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/rexec/task.rb', line 352 def running? if self.class.running?(@pid) # The pid still seems alive, check that it isn't some other process using the same pid... @result_lock.synchronize do # If we haven't waited for it yet, it must be either a running process or a zombie... return @status != :stopped end end return false end |
#stop ⇒ Object
Forcefully stop the child process.
428 429 430 431 432 433 |
# File 'lib/rexec/task.rb', line 428 def stop if running? close_input kill end end |
#wait ⇒ Object
Wait for the child process to finish, return the exit status. This function can be called from multiple threads.
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/rexec/task.rb', line 389 def wait begin_wait = false # Check to see if some other caller is already waiting on the result... @result_lock.synchronize do case @status when :waiting # If so, wait for the wait to finish... @result_available.wait(@result_lock) when :running # Else, mark that we should begin waiting... begin_wait = true @status = :waiting when :stopped return @result end end # If we should begin waiting (the first thread to wait)... if begin_wait begin # Wait for the result... _pid, @result = Process.wait2(@pid) end # The result is now available... @result_lock.synchronize do @status = :stopped end # Notify other threads... @result_available.broadcast() end # Return the result return @result end |