Class: JobPool::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/job_pool/job.rb

Overview

A job keeps track of the child process that gets forked. job is the Ruby data structure, process is the Unix process.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool, cmd, inio = nil, outio = nil, errio = nil, timeout = nil) ⇒ Job

runs cmd, passes instr on its stdin, and fills outio and errio with the command’s output. TODO: should specify args using keywords rather than position.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/job_pool/job.rb', line 20

def initialize pool, cmd, inio=nil, outio=nil, errio=nil, timeout=nil
  @start_time = Time.now
  @pool  = pool
  @inio  = inio || StringIO.new
  @inio  = StringIO.new(@inio.to_s) unless @inio.respond_to?(:readpartial)
  @outio = outio || StringIO.new
  @errio = errio || StringIO.new
  @chin, @chout, @cherr, @child = Open3.popen3(*cmd)

  @pool._add(self)
  @chout.binmode

  @killed = false
  @timed_out = false

  @thrin  = Thread.new { drain(@inio, @chin) }
  @throut = Thread.new { drain(@chout, @outio) }
  @threrr = Thread.new { drain(@cherr, @errio) }

  # ensure cleanup is called when the child exits. (crazy that this requires a whole new thread!)
  @cleanup_thread = Thread.new do
    if timeout
      # TODO: inline outatime
      outatime unless @child.join(timeout)
    else
      @child.join
    end
    stop
  end
end

Instance Attribute Details

#errioObject (readonly)

fds for child’s stdin/stdout/stderr



15
16
17
# File 'lib/job_pool/job.rb', line 15

def errio
  @errio
end

#inioObject (readonly)

fds for child’s stdin/stdout/stderr



15
16
17
# File 'lib/job_pool/job.rb', line 15

def inio
  @inio
end

#outioObject (readonly)

fds for child’s stdin/stdout/stderr



15
16
17
# File 'lib/job_pool/job.rb', line 15

def outio
  @outio
end

#start_timeObject (readonly)

start and finish times of this job



14
15
16
# File 'lib/job_pool/job.rb', line 14

def start_time
  @start_time
end

#stop_timeObject (readonly)

start and finish times of this job



14
15
16
# File 'lib/job_pool/job.rb', line 14

def stop_time
  @stop_time
end

Instance Method Details

#_child_threadObject

only meant to be used by the ProcessMonitor



109
110
111
# File 'lib/job_pool/job.rb', line 109

def _child_thread
  @child
end

#_cleanupObject

may only be called once, synchronized by stop()



114
115
116
117
# File 'lib/job_pool/job.rb', line 114

def _cleanup
  raise "Someone else already cleaned up this job?!" if @stop_time
  @stop_time = Time.now
end

#_deactivateObject

returns true if process was previously active. must be externally synchronized. TODO: this is a terrible api. gotta be a way to clean it up.



121
122
123
124
125
# File 'lib/job_pool/job.rb', line 121

def _deactivate
  retval = @inactive
  @inactive = true
  return !retval
end

#errorObject



63
64
65
# File 'lib/job_pool/job.rb', line 63

def error
  @errio.string
end

#finished?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/job_pool/job.rb', line 67

def finished?
  @stop_time != nil
end

#kill(seconds_until_panic = 2) ⇒ Object

kill-o-zaps the phantom process now (using -9 if needed), then waits until it’s truly gone



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/job_pool/job.rb', line 85

def kill seconds_until_panic=2
  @killed = true
  if @child.alive?
    # rescue because process might have died between previous line and this one
    Process.kill("TERM", @child.pid) rescue Errno::ESRCH
  end
  if !@child.join(seconds_until_panic)
    Process.kill("KILL", @child.pid) if @child.alive?
  end
  # ensure kill doesn't return until process is truly gone
  # (there may be a chance of this deadlocking with a blocking callback... not sure)
  @cleanup_thread.join unless Thread.current == @cleanup_thread
end

#killed?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/job_pool/job.rb', line 76

def killed?
  @killed
end

#outputObject



59
60
61
# File 'lib/job_pool/job.rb', line 59

def output
  @outio.string
end

#read(*args) ⇒ Object



55
56
57
# File 'lib/job_pool/job.rb', line 55

def read *args
  @outio.read *args
end

#stopObject

waits patiently until the process terminates, then cleans up



100
101
102
103
104
105
# File 'lib/job_pool/job.rb', line 100

def stop
  wait_for_the_end   # do all our waiting outside the sync loop
  @pool._remove(self) do
    _cleanup
  end
end

#success?Boolean

returns false if the process hasn’t finished yet

Returns:

  • (Boolean)


72
73
74
# File 'lib/job_pool/job.rb', line 72

def success?
  finished? && @child.value.success? ? true : false
end

#timed_out?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/job_pool/job.rb', line 80

def timed_out?
  @timed_out
end

#write(*args) ⇒ Object



51
52
53
# File 'lib/job_pool/job.rb', line 51

def write *args
  @inio.write *args
end