Class: Nodule::Process

Inherits:
Base
  • Object
show all
Defined in:
lib/nodule/process.rb

Direct Known Subclasses

Cassandra

Instance Attribute Summary collapse

Attributes inherited from Base

#prefix, #read_count, #readers, #running

Instance Method Summary collapse

Methods inherited from Base

#add_reader, #add_readers, #join_topology!, #read_until, #run_readers, #verbose, #wait_with_backoff

Constructor Details

#initialize(*argv) ⇒ Process

Returns a new instance of Process.

Parameters:

  • command, (Array)

    argv

  • opts (Hash)


16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/nodule/process.rb', line 16

def initialize(*argv)
  @opts = argv[-1].is_a?(Hash) ? argv.pop : {}
  @env = argv[0].is_a?(Hash) ? argv.shift : {}
  @status = nil
  @started = -1   # give started and ended default values
  @ended = -2
  @pid = nil
  @argv = argv
  @stdout_opts = @opts.delete(:stdout) || :capture
  @stderr_opts = @opts.delete(:stderr) || :capture

  super(@opts)
end

Instance Attribute Details

#argvObject (readonly)

Returns the value of attribute argv.



11
12
13
# File 'lib/nodule/process.rb', line 11

def argv
  @argv
end

#endedObject (readonly)

Returns the value of attribute ended.



11
12
13
# File 'lib/nodule/process.rb', line 11

def ended
  @ended
end

#pidObject (readonly)

Returns the value of attribute pid.



11
12
13
# File 'lib/nodule/process.rb', line 11

def pid
  @pid
end

#startedObject (readonly)

Returns the value of attribute started.



11
12
13
# File 'lib/nodule/process.rb', line 11

def started
  @started
end

#topologyObject

Returns the value of attribute topology.



12
13
14
# File 'lib/nodule/process.rb', line 12

def topology
  @topology
end

Instance Method Details

#_apply_topology(arg) ⇒ Object

convert symbol arguments to the to_s result of a topology item if it exists, run procs, and flatten enumerbles, so :foobar will access the topology’s entry for :foobar and call .to_s on it proc { “abc” } will become “abc”

‘if=’, :foobar

will resolve :foobar (this is recursive) and join all the results with no padding

anything left unmatched will be coerced into a string with .to_s



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/nodule/process.rb', line 36

def _apply_topology(arg)
  # only symbols are auto-translated to resource strings, String keys intentionally do not match
  if arg.kind_of? Symbol
    if @topology.has_key? arg
      @topology[arg].to_s
    else
      raise TopologyUnknownSymbolError.new "Unresolvable topology symbol, :#{arg}"
    end
  # sub-lists are recursed then joined with no padding, so:
  # ["if=", :foo] would become "if=value"
  elsif arg.respond_to? :map
    new = arg.map { |a| _apply_topology(a) }
    new.join('')
  else
    arg.to_s
  end
end

#_kill(sig) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/nodule/process.rb', line 102

def _kill(sig)
  # Do not use negative signals. You will _always_ get ESRCH for child processes, since they are
  # by definition not process group leaders, which is usually synonymous with the process group id
  # that "kill -9 $PID" relies on.  See kill(2).
  raise ArgumentError.new "negative signals are wrong and unsupported" unless sig > 0
  raise ProcessNotRunningError.new unless @pid

  verbose "Sending signal #{sig} to process #{@pid}."
  ::Process.kill(sig, @pid)
  # do not catch ESRCH - ESRCH means we did something totally buggy, likewise, an exception
  # should fire if the process is not running since there's all kinds of code already checking
  # that it is running before getting this far.
end

#clear_stderr!Object

Clear the stderr buffer and reset the counter. proxies: Nodule::Base.clear!



298
299
300
# File 'lib/nodule/process.rb', line 298

def clear_stderr!
  @stderr_handler.clear!
end

#clear_stdout!Object Also known as: clear!

Clear the stdout buffer and reset the counter. proxies: Nodule::Base.clear!



253
254
255
# File 'lib/nodule/process.rb', line 253

def clear_stdout!
  @stdout_handler.clear!
end

#closeObject

Close all of the pipes.



352
353
354
355
356
# File 'lib/nodule/process.rb', line 352

def close
  @stdin.close rescue nil
  @stdout.close rescue nil
  @stderr.close rescue nil
end

#done?Boolean

Returns:

  • (Boolean)


202
203
204
205
206
207
# File 'lib/nodule/process.rb', line 202

def done?
  raise ProcessNotRunningError.new "#@prefix called .done? before .run." unless @pid
  waitpid unless @status
  return true if @status
  waitpid == @pid
end

#elapsedObject

Return the elapsed time in milliseconds.



212
213
214
215
216
# File 'lib/nodule/process.rb', line 212

def elapsed
  raise ProcessNotRunningError.new unless @started
  raise ProcessStillRunningError.new unless @ended
  @ended - @started
end

#inspectObject

Returns to_hash.inspect



382
383
384
# File 'lib/nodule/process.rb', line 382

def inspect
  to_hash.inspect
end

#iodone?Object

Check whether the process has exited or been killed and cleaned up. Calls waitpid2 behind the scenes if necessary. Throws ProcessNotRunningError if called before .run.



201
# File 'lib/nodule/process.rb', line 201

alias :iodone? :done?

#iowaitObject

Call waitpid and block until the process exits or timeout is reached.



135
# File 'lib/nodule/process.rb', line 135

alias :iowait :wait

Write the to child process’s stdin using IO.print.

Parameters:

  • see (String)

    IO.print



313
314
315
# File 'lib/nodule/process.rb', line 313

def print(*args)
  @stdin.print(*args)
end

#puts(*args) ⇒ Object

Write the to child process’s stdin using IO.puts.

Parameters:

  • see (String)

    IO.puts



321
322
323
# File 'lib/nodule/process.rb', line 321

def puts(*args)
  @stdin.puts(*args)
end

#require_stderr_count(count, max_sleep = 10) ⇒ Object

Proxies to stderr require_read_count.



305
306
307
# File 'lib/nodule/process.rb', line 305

def require_stderr_count(count, max_sleep=10)
  @stderr_handler.require_read_count count, max_sleep
end

#require_stdout_count(count, max_sleep = 10) ⇒ Object Also known as: require_read_count

Proxies to stdout require_read_count.



261
262
263
# File 'lib/nodule/process.rb', line 261

def require_stdout_count(count, max_sleep=10)
  @stdout_handler.require_read_count count, max_sleep
end

#resetObject

Clear all of the state and prepare to be able to .run again. Raises ProcessStillRunningError if the child is still running.



94
95
96
97
98
99
100
# File 'lib/nodule/process.rb', line 94

def reset
  raise ProcessStillRunningError.new unless done?
  @stdout_handler.stop
  @stderr_handler.stop
  close
  @pid = nil
end

#runObject



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/nodule/process.rb', line 54

def run
  # raise exception only if the start time comes after the end time
  if @started > @ended
    raise ProcessAlreadyRunningError.new if @pid
  end

  argv = @argv.map { |arg| _apply_topology(arg) }

  # Simply calling spawn with *argv isn't good enough, it really needs the command
  # to be a completely separate argument. This is likely due to a bug in spawn().
  command = argv.shift

  verbose "Spawning: #{command} #{argv.join(' ')}"

  @stdin_r, @stdin    = IO.pipe
  @stdout,  @stdout_w = IO.pipe
  @stderr,  @stderr_w = IO.pipe

  @stdout_handler = Nodule::LineIO.new :io => @stdout, :reader => @stdout_opts, :topology => @topology, :run => true
  @stderr_handler = Nodule::LineIO.new :io => @stderr, :reader => @stderr_opts, :topology => @topology, :run => true

  @pid = spawn(@env, command, *argv,
    :in  => @stdin_r,
    :out => @stdout_w,
    :err => @stderr_w,
  )

  @started = Time.now

  @stdin_r.close
  @stdout_w.close
  @stderr_w.close

  super
end

#statusObject

Return Process::Status as returned by Process::waitpid2.



190
191
192
193
194
# File 'lib/nodule/process.rb', line 190

def status
  raise ProcessNotRunningError.new "#@prefix called .status before .run." unless @pid
  waitpid unless @status
  @status
end

#stderrArray{String}

Get all currently captured stderr. Does not clear the buffer. proxies: Nodule::Base.output

Returns:



281
282
283
# File 'lib/nodule/process.rb', line 281

def stderr
  @stderr_handler.output
end

#stderr!Array{String}

Get all currently captured stderr. Resets the buffer and counts. proxies: Nodule::Base.output!

Returns:



290
291
292
# File 'lib/nodule/process.rb', line 290

def stderr!
  @stderr_handler.output!
end

#stderr?TrueClass, FalseClass

Returns whether or not any stderr has been captured. Will raise an exception if capture is not enabled. proxies: Nodule::Base.output?

Returns:

  • (TrueClass, FalseClass)


272
273
274
# File 'lib/nodule/process.rb', line 272

def stderr?
  @stderr_handler.output?
end

#stderr_pipeIO

Access the STDERR pipe IO object of the handle.

Returns:

  • (IO)


345
346
347
# File 'lib/nodule/process.rb', line 345

def stderr_pipe
  @stderr
end

#stdin_pipeIO

Access the STDIN pipe IO object of the handle.

Returns:

  • (IO)


329
330
331
# File 'lib/nodule/process.rb', line 329

def stdin_pipe
  @stdin
end

#stdoutArray{String} Also known as: output

Get all currently captured stdout. Does not clear the buffer. proxies: Nodule::Base.output

Returns:



234
235
236
# File 'lib/nodule/process.rb', line 234

def stdout
  @stdout_handler.output
end

#stdout!Array{String} Also known as: output!

Get all currently captured stdout. Resets the buffer and counts. proxies: Nodule::Base.output!

Returns:



244
245
246
# File 'lib/nodule/process.rb', line 244

def stdout!
  @stdout_handler.output!
end

#stdout?TrueClass, FalseClass Also known as: output?

Returns whether or not any stdout has been captured. Will raise an exception if capture is not enabled. proxies: Nodule::Base.output?

Returns:

  • (TrueClass, FalseClass)


224
225
226
# File 'lib/nodule/process.rb', line 224

def stdout?
  @stdout_handler.output?
end

#stdout_pipeIO

Access the STDOUT pipe IO object of the handle.

Returns:

  • (IO)


337
338
339
# File 'lib/nodule/process.rb', line 337

def stdout_pipe
  @stdout
end

#stopObject

Send SIGTERM (15) to the child process, sleep 1/25 of a second, then call waitpid. For well-behaving processes, this should be enough to make it stop. Returns true/false just like done?



161
162
163
164
165
166
167
168
169
# File 'lib/nodule/process.rb', line 161

def stop
  return if done?
  _kill 15 # never negative!
  @stdout_handler.stop
  @stderr_handler.stop
  sleep 0.05
  @pid == waitpid
  close
end

#stop!Object

Send SIGKILL (9) to the child process, sleep 1/10 of a second, then call waitpid and return. Returns true/false just like done?



175
176
177
178
179
180
181
182
183
184
185
# File 'lib/nodule/process.rb', line 175

def stop!
  raise ProcessNotRunningError.new unless @pid
  return if done?

  _kill 9 # never negative!
  @stdout_handler.stop!
  @stderr_handler.stop!
  sleep 0.1
  @pid == waitpid
  close
end

#to_hashObject

Return most of the data about the process as a hash. This is safe to call at any point.



361
362
363
364
365
366
367
368
369
370
# File 'lib/nodule/process.rb', line 361

def to_hash
  {
    :argv    => @argv,
    :started => @started.to_i,
    :ended   => @ended.to_i,
    :elapsed => elapsed,
    :pid     => @pid,
    :retval  => ((@status.nil? and @status.exited?) ? nil : @status.exitstatus)
 }
end

#to_sObject

Returns the command as a string.



375
376
377
# File 'lib/nodule/process.rb', line 375

def to_s
  @argv.join(' ')
end

#wait(timeout = nil) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/nodule/process.rb', line 136

def wait(timeout=nil)
  pid = nil # silence warning

  # block indefinitely on nil/0 timeout
  unless timeout
    return waitpid(0)
  end

  wait_with_backoff timeout do
    if @status
      true
    else
      pid = waitpid(::Process::WNOHANG)
      done?
    end
  end

  pid
end

#waitpid(flag = ::Process::WNOHANG) ⇒ Object

Call Process.waitpid2, save the status (accessible with obj.status) and return just the pid value returned by waitpid2.



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/nodule/process.rb', line 120

def waitpid(flag=::Process::WNOHANG)
  raise ProcessNotRunningError.new "pid is not known" unless @pid
  raise ProcessNotRunningError.new "process seems to have exited #{@status.inspect}" if @status

  pid, @status = ::Process.waitpid2(@pid, flag)

  # this is as accurate as we can get, and it will generally be good enough for test work
  @ended = Time.now if pid == @pid

  pid
end