Class: JobPool::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/job_pool/job.rb

Overview

A job keeps track of the child process that gets forked. job is the Ruby data structure, process is the Unix process.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool, command, options = {}) ⇒ Job

internal: Use [JobPool#launch], don’t call this method directly.

Starts a process.

## Parameters

  • pool [JobPool]: The pool that will contain this job.

  • command [String, Array]: The command to run. Can be specified either

    as a string or an array of arguments for Process.spawn.
    

## Options

  • stdin [IO, String]: The child’s input. If an IO object isn’t supplied,

    an IOString will be created by calling the parameter's to_s method.
    
  • stdout [IO]: the IO object to receive the child’s output.

  • stderr [IO]: the IO object to receive the child’s stderr.

  • timeout [seconds]: the number of seconds to wait before killing the job.

If stdin, stdout, or stderr are omitted, an empty IOString will be created. If output and error are IOStrings, the [output] method will return the child’s stdout, and [error] will return its stderr.

## Examples

  • Simple invocation: ‘job = Job.new pool, ’echo hi’‘

  • Redirect outpout to a file: ‘Job.new pool, ’wkhtmltopdf’, stdout: File.new(‘/tmp/out.pdf’, ‘w’)‘

  • Passing an array and options: ‘Job.new pool, [’cat’, ‘/tmp/infile’, true]‘



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/job_pool/job.rb', line 45

def initialize pool, command, options={}
  @start_time = Time.now
  @pool   = pool
  @killed = false
  @timed_out = false

  @stdin  = options[:stdin] || StringIO.new
  @stdin  = StringIO.new(@stdin.to_s) unless @stdin.respond_to?(:readpartial)
  @stdout = options[:stdout] || StringIO.new
  @stderr  = options[:stderr] || StringIO.new

  @chin, @chout, @cherr, @child = Open3.popen3(*command)
  @chout.binmode

  @pool._add(self)

  @thrin  = Thread.new { drain(@stdin, @chin) }
  @throut = Thread.new { drain(@chout, @stdout) }
  @threrr = Thread.new { drain(@cherr, @stderr) }

  # ensure cleanup is called when the child exits. (crazy that this requires a whole new thread!)
  @cleanup_thread = Thread.new do
    if options[:timeout]
      unless @child.join(timeout)
        @timed_out = true
        kill
      end
    else
      @child.join
    end
    stop
  end
end

Instance Attribute Details

#start_timeObject (readonly)

start and finish times of this job



14
15
16
# File 'lib/job_pool/job.rb', line 14

def start_time
  @start_time
end

#stderrObject (readonly)

fds for child’s stdin/stdout/stderr



15
16
17
# File 'lib/job_pool/job.rb', line 15

def stderr
  @stderr
end

#stdinObject (readonly)

fds for child’s stdin/stdout/stderr



15
16
17
# File 'lib/job_pool/job.rb', line 15

def stdin
  @stdin
end

#stdoutObject (readonly)

fds for child’s stdin/stdout/stderr



15
16
17
# File 'lib/job_pool/job.rb', line 15

def stdout
  @stdout
end

#stop_timeObject (readonly)

start and finish times of this job



14
15
16
# File 'lib/job_pool/job.rb', line 14

def stop_time
  @stop_time
end

Instance Method Details

#_child_threadObject

only meant to be used by the ProcessMonitor



142
143
144
# File 'lib/job_pool/job.rb', line 142

def _child_thread
  @child
end

#_cleanupObject

may only be called once, synchronized by stop()



147
148
149
150
# File 'lib/job_pool/job.rb', line 147

def _cleanup
  raise "Someone else already cleaned up this job?!" if @stop_time
  @stop_time = Time.now
end

#_deactivateObject

returns true if process was previously active. must be externally synchronized. TODO: this is a terrible api. gotta be a way to clean it up.



154
155
156
157
158
# File 'lib/job_pool/job.rb', line 154

def _deactivate
  retval = @inactive
  @inactive = true
  return !retval
end

#errorObject



96
97
98
# File 'lib/job_pool/job.rb', line 96

def error
  @stderr.string
end

#finished?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/job_pool/job.rb', line 100

def finished?
  @stop_time != nil
end

#kill(seconds_until_panic = 2) ⇒ Object

kill-o-zaps the phantom process now (using -9 if needed), then waits until it’s truly gone



118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/job_pool/job.rb', line 118

def kill seconds_until_panic=2
  @killed = true
  if @child.alive?
    # rescue because process might have died between previous line and this one
    Process.kill("TERM", @child.pid) rescue Errno::ESRCH
  end
  if !@child.join(seconds_until_panic)
    Process.kill("KILL", @child.pid) if @child.alive?
  end
  # ensure kill doesn't return until process is truly gone
  # (there may be a chance of this deadlocking with a blocking callback... not sure)
  @cleanup_thread.join unless Thread.current == @cleanup_thread
end

#killed?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/job_pool/job.rb', line 109

def killed?
  @killed
end

#outputObject



92
93
94
# File 'lib/job_pool/job.rb', line 92

def output
  @stdout.string
end

#read(*args) ⇒ Object



88
89
90
# File 'lib/job_pool/job.rb', line 88

def read *args
  @stdout.read *args
end

#stopObject

waits patiently until the process terminates, then cleans up



133
134
135
136
137
138
# File 'lib/job_pool/job.rb', line 133

def stop
  wait_for_the_end   # do all our waiting outside the sync loop
  @pool._remove(self) do
    _cleanup
  end
end

#success?Boolean

returns false if the process hasn’t finished yet

Returns:

  • (Boolean)


105
106
107
# File 'lib/job_pool/job.rb', line 105

def success?
  finished? && @child.value.success? ? true : false
end

#timed_out?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/job_pool/job.rb', line 113

def timed_out?
  @timed_out
end

#write(*args) ⇒ Object

Parameters:

  • opts (Hash)

    the options to create a message with.



84
85
86
# File 'lib/job_pool/job.rb', line 84

def write *args
  @stdin.write *args
end