Class: RExec::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/rexec/task.rb

Overview

Represents a running process, either a child process or a background/daemon process. Provides an easy high level interface for managing process life-cycle.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input, output, error, pid) ⇒ Task

:nodoc:



323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/rexec/task.rb', line 323

def initialize(input, output, error, pid) # :nodoc:
	@input = input
	@output = output
	@error = error

	@pid = pid

	@result = nil
	@status = :running
	@result_lock = Mutex.new
	@result_available = ConditionVariable.new
end

Instance Attribute Details

#errorObject (readonly)

Standard error from the running task.



343
344
345
# File 'lib/rexec/task.rb', line 343

def error
  @error
end

#inputObject (readonly)

Standard input to the running task.



337
338
339
# File 'lib/rexec/task.rb', line 337

def input
  @input
end

#outputObject (readonly)

Standard output from the running task.



340
341
342
# File 'lib/rexec/task.rb', line 340

def output
  @output
end

#pidObject (readonly)

The PID of the running task.



346
347
348
# File 'lib/rexec/task.rb', line 346

def pid
  @pid
end

#resultObject (readonly)

The status of the task after calling task.wait.



349
350
351
# File 'lib/rexec/task.rb', line 349

def result
  @result
end

Class Method Details

.open(command, options = {}, &block) ⇒ Object

Open a process. Similar to IO.popen, but provides a much more generic interface to stdin, stdout, stderr and the pid. We also attempt to tidy up as much as possible given some kind of error or exception. You may write to output, and read from input and error.

Typical usage looks similar to IO.popen: count = 0 result = Task.open([“ls”, “-la”], :passthrough => :err) do |task| count = task.output.read.split(/n/).size end puts “Count: #count” if result.exitstatus == 0

The basic command can simply be a string, and this will be passed to Kernel#exec which will perform shell expansion on the arguments.

If the command passed is an array, this will be executed without shell expansion.

If a Proc (or anything that respond_to? :call) is provided, this will be executed in the child process. Here is an example of a long running background process:

daemon = Proc.new do # Long running process sleep(1000) end task = Task.open(daemon, :daemonize => true, :in => …, :out => …, :err => …) exit(0)

Options

:passthrough, :in, :out, :err

The current process (e.g. ruby) has a set of existing pipes $stdin, $stdout and $stderr. These pipes can also be used by the child process. The passthrough option allows you to specify which pipes are retained from the parent process by the child.

Typically it is useful to passthrough $stderr, so that errors in the child process are printed out in the terminal of the parent process: Task.open(, :passthrough => :err) Task.open(, :passthrough => [:in, :out, :err]) Task.open(, :passthrough => :all)

It is also possible to redirect to files, which can be useful if you want to keep a a log file: Task.open(, :out => File.open(“output.log”))

The default behaviour is to create a new pipe, but any pipe (e.g. a network socket) could be used: Task.open(, :in => IO.pipe)

:daemonize

The process that is opened may be detached from the parent process. This allows the child process to exist even if the parent process exits. In this case, you will also probably want to specify the :passthrough option for log files: Task.open(, :daemonize => true, :in => File.open(“/dev/null”), :out => File.open(“/var/log/child.log”, “a”), :err => File.open(“/var/log/child.err”, “a”) )

:env, :env!

Provide a environment which will be used by the child process. Use :env to update the exsting environment and :env! to replace it completely. Task.open(, :env => => ‘bar’)

:umask

Set the umask for the new process, as per File.umask.

:chdir

Set the current working directory for the new process, as per Dir.chdir.

:preflight

Similar to a proc based command, but executed before execing the given process. preflight = Proc.new do |command, options| # Setup some default state before exec the new process. end Task.open(, :preflight => preflight)

The options hash is passed directly so you can supply custom arguments to the preflight function.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
# File 'lib/rexec/task.rb', line 257

def self.open(command, options = {}, &block)
	cin, cout, cerr = pipes_for_options(options)
	spawn = options[:daemonize] ? :spawn_daemon : :spawn_child

	cid = self.send(spawn) do
		close_pipes(cin[WR], cout[RD], cerr[RD])

		STDIN.reopen(cin[RD]) if cin[RD]
		STDOUT.reopen(cout[WR]) if cout[WR]
		STDERR.reopen(cerr[WR]) if cerr[WR]

		if options[:env!]
			ENV.clear
			ENV.update(options[:env!])
		elsif options[:env]
			ENV.update(options[:env])
		end

		if options[:umask]
			File.umask(options[:umask])
		end

		if options[:chdir]
			Dir.chdir(options[:chdir])
		end

		if options[:preflight]
			preflight.call(command, options)
		end

		if command.respond_to? :call
			command.call
		elsif Array === command
			# If command is a Pathname, we need to convert it to an absolute path if possible,
			# otherwise if it is relative it might cause problems.
			if command[0].respond_to? :realpath
				command[0] = command[0].realpath
			end
			
			exec *command
		else
			if command.respond_to? :realpath
				command = command.realpath
			end
			
			exec command.to_s
		end
	end

	close_pipes(cin[RD], cout[WR], cerr[WR])

	task = Task.new(cin[WR], cout[RD], cerr[RD], cid)

	if block_given?
		begin
			yield task
			task.close_input
			return task.wait
		ensure
			task.close
		end
	else
		return task
	end
end

.running?(pid) ⇒ Boolean

Returns true if the given pid is a current process

Returns:

  • (Boolean)


120
121
122
123
124
# File 'lib/rexec/task.rb', line 120

def self.running?(pid)
	gpid = Process.getpgid(pid) rescue nil

	return gpid != nil ? true : false
end

.spawn_child(&block) ⇒ Object

Very simple method to spawn a child process

spawn_child do puts “Hello from child!” end



166
167
168
169
170
171
172
173
174
# File 'lib/rexec/task.rb', line 166

def self.spawn_child(&block)
	pid = fork do
		yield

		exit!(0)
	end

	return pid
end

.spawn_daemon(&block) ⇒ Object

Very simple method to spawn a child daemon. A daemon is detatched from the controlling tty, and thus is not killed when the parent process finishes.

spawn_daemon do Dir.chdir(“/”) File.umask 0000 puts “Hello from daemon!” sleep(600) puts “This code will not quit when parent process finishes…” puts “…but $stdout might be closed unless you set it to a file.” end



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/rexec/task.rb', line 137

def self.spawn_daemon(&block)
	pid_pipe = IO.pipe

	fork do
		Process.setsid
		exit if fork

		# Send the pid back to the parent
		pid_pipe[RD].close
		pid_pipe[WR].write(Process.pid.to_s)
		pid_pipe[WR].close

		yield

		exit(0)
	end

	pid_pipe[WR].close
	pid = pid_pipe[RD].read
	pid_pipe[RD].close

	return pid.to_i
end

Instance Method Details

#closeObject

Close all connections to the child process



365
366
367
368
369
370
371
# File 'lib/rexec/task.rb', line 365

def close
	begin
		self.class.dump_pipes(@output, @error)
	ensure
		self.class.close_pipes(@input, @output, @error)
	end
end

#close_inputObject

Close input pipe to child process (if applicable)



374
375
376
# File 'lib/rexec/task.rb', line 374

def close_input
	@input.close if @input and !@input.closed?
end

#kill(signal = "INT") ⇒ Object

Send a signal to the child process



379
380
381
382
383
384
385
# File 'lib/rexec/task.rb', line 379

def kill(signal = "INT")
	if running?
		Process.kill(signal, @pid)
	else
		raise Errno::ECHILD
	end
end

#running?Boolean

Returns true if the current task is still running

Returns:

  • (Boolean)


352
353
354
355
356
357
358
359
360
361
362
# File 'lib/rexec/task.rb', line 352

def running?
	if self.class.running?(@pid)
		# The pid still seems alive, check that it isn't some other process using the same pid...
		@result_lock.synchronize do
			# If we haven't waited for it yet, it must be either a running process or a zombie...
			return @status != :stopped
		end
	end
	
	return false
end

#stopObject

Forcefully stop the child process.



428
429
430
431
432
433
# File 'lib/rexec/task.rb', line 428

def stop
	if running?
		close_input
		kill
	end
end

#waitObject

Wait for the child process to finish, return the exit status. This function can be called from multiple threads.



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/rexec/task.rb', line 389

def wait
	begin_wait = false
	
	# Check to see if some other caller is already waiting on the result...
	@result_lock.synchronize do
		case @status
		when :waiting
			# If so, wait for the wait to finish...
			@result_available.wait(@result_lock)
		when :running
			# Else, mark that we should begin waiting...
			begin_wait = true
			@status = :waiting
		when :stopped
			return @result
		end
	end
	
	# If we should begin waiting (the first thread to wait)...
	if begin_wait
		begin
			# Wait for the result...
			_pid, @result = Process.wait2(@pid)
		end
		
		# The result is now available...
		@result_lock.synchronize do
			@status = :stopped
		end
		
		# Notify other threads...
		@result_available.broadcast()
	end
	
	# Return the result
	return @result
end