Class: Rubysh::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/rubysh/runner.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(command) ⇒ Runner

Returns a new instance of Runner.



5
6
7
8
9
10
11
12
13
14
15
# File 'lib/rubysh/runner.rb', line 5

def initialize(command)
  @runner_state = :initialized

  @command = command
  @targets = {}
  @state = {}

  @parallel_io = nil

  prepare!
end

Instance Attribute Details

#commandObject

Returns the value of attribute command.



3
4
5
# File 'lib/rubysh/runner.rb', line 3

def command
  @command
end

#parallel_ioObject

Returns the value of attribute parallel_io.



3
4
5
# File 'lib/rubysh/runner.rb', line 3

def parallel_io
  @parallel_io
end

#targetsObject

Returns the value of attribute targets.



3
4
5
# File 'lib/rubysh/runner.rb', line 3

def targets
  @targets
end

Instance Method Details

#check_call(&blk) ⇒ Object



132
133
134
135
136
137
138
139
140
# File 'lib/rubysh/runner.rb', line 132

def check_call(&blk)
  run
  status = full_status
  unless status.success?
    raise Rubysh::Error::BadExitError.new("#{@command} exited with #{rendered_status(status)}")
  end
  status
  self
end

#communicateObject



82
83
84
85
86
87
88
# File 'lib/rubysh/runner.rb', line 82

def communicate
  raise Rubysh::Error::AlreadyRunError.new("Can only communicate with a runner in runner_state :started, not #{@runner_state.inspect}") unless @runner_state == :started
  writers.each do |io, target_name|
    @parallel_io.close(target_name) unless io.closed?
  end
  wait
end

#exec_errorObject



110
111
112
113
# File 'lib/rubysh/runner.rb', line 110

def exec_error
  subprocess = state(@command)[:subprocess]
  subprocess.exec_error
end

#exitstatus(command = nil) ⇒ Object

Convenience wrapper



102
103
104
105
106
107
108
# File 'lib/rubysh/runner.rb', line 102

def exitstatus(command=nil)
  if st = full_status(command)
    st.exitstatus
  else
    nil
  end
end

#full_status(command = nil) ⇒ Object

Ruby’s Process::Status. Has fun things like pid and signaled?



91
92
93
94
# File 'lib/rubysh/runner.rb', line 91

def full_status(command=nil)
  command ||= @command
  @command.status(self)
end

#inspectObject



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/rubysh/runner.rb', line 166

def inspect
  extras = []
  valid_readers = readers.values.map(&:inspect).join(', ')
  valid_writers = writers.values.map(&:inspect).join(', ')

  extras << "readers: #{valid_readers}" if valid_readers.length > 0
  extras << "writers: #{valid_writers}" if valid_writers.length > 0
  if status = full_status
    extras << rendered_status(status)
  elsif mypid = pid
    extras << "pid: #{pid}"
  end
  extra_display = extras.length > 0 ? " (#{extras.join(', ')})" : nil

  "#{self.class}: #{command.stringify}#{extra_display}"
end

#kill(signal = 'TERM') ⇒ Object



213
214
215
216
217
218
219
220
# File 'lib/rubysh/runner.rb', line 213

def kill(signal='TERM')
  my_pid = pid
  unless my_pid
    raise Rubysh::Error::BaseError.new("You haven't started this runner yet.")
  end
  Rubysh.log.debug("Sending #{signal} to #{my_pid}")
  Process.kill(signal, my_pid)
end

#pid(command = nil) ⇒ Object



96
97
98
99
# File 'lib/rubysh/runner.rb', line 96

def pid(command=nil)
  command ||= @command
  @command.pid(self)
end

#read(target = nil, opts = nil) ⇒ Object

A bit of an unothordox read interface, not sure if I like it. Also, the target/opts magic is probably too magical (and not consistent with write!)

You can do:

read: finish the subprocess, and read from FD 1 in the child read(:how => :partial): wait until there are bytes on FD 1, and

then return what you can

read(2, :how => :partial): Do the same for FD 2 read(:stdout, :how => :partial): Do the same with whatever the named

descriptor :stdout

read(:how => :nonblock): Return whatever is immediately available



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/rubysh/runner.rb', line 37

def read(target=nil, opts=nil)
  raise Rubysh::Error::AlreadyRunError.new("Can only read from a runner in runner_state :started or :waited, not #{@runner_state.inspect}") unless @runner_state == :started || @runner_state == :waited
  raise Rubysh::Error::BaseError.new("Can't read from a runner where :on_read has been provided") if command.opts[:on_read]

  if target.kind_of?(Hash)
    opts = target
    target = nil
  end
  target ||= 1
  opts ||= {}

  # TODO: add a stringio
  state = target_state(target, true)
  target_name = state[:target_name]

  # Be nice to people and validate the hash
  valid_keys = [:how]
  extra_keys = opts.keys - valid_keys
  raise raise Rubysh::Error::BaseError.new("Unrecognized keys #{extra_keys.inspect}. (Valid keys: #{valid_keys.inspect}") if extra_keys.length > 0

  case how = opts[:how]
  when :partial
    # Read until we get some bytes
    while state[:buffer].length == state[:read_pos]
      # If everything's exited, just return nil
      return nil if @parallel_io.finalized
      @parallel_io.run_once
    end
  when :nonblock
    @parallel_io.read_available(state[:target])
    # Return nil unless we have something new
    return nil if state[:buffer].length == state[:read_pos]
  when nil
    communicate if @runner_state == :started
  else
    raise Rubysh::Error::BaseError.new("Invalid read directive #{how.inspect}")
  end

  state[:buffer].pos = state[:read_pos]
  bytes = state[:buffer].read
  # Could also increment by bytes, but meh.
  state[:read_pos] = state[:buffer].pos
  bytes
end

#readersObject



142
143
144
145
146
147
148
149
150
# File 'lib/rubysh/runner.rb', line 142

def readers
  readers = {}
  @targets.each do |target_name, target_state|
    next unless target_state[:target_reading?]
    target = target_state[:target]
    readers[target] = target_name
  end
  readers
end

#run(input = {}, &blk) ⇒ Object



124
125
126
127
128
129
130
# File 'lib/rubysh/runner.rb', line 124

def run(input={}, &blk)
  run_async
  blk.call(self) if blk
  run_io
  do_wait
  self
end

#run_asyncObject

API for running/waiting



116
117
118
119
120
121
122
# File 'lib/rubysh/runner.rb', line 116

def run_async
  raise Rubysh::Error::AlreadyRunError.new("You have already run this #{self.class} instance. Cannot run again. You can run its command directly though, which will create a fresh #{self.class} instance.") unless @runner_state == :initialized
  @command.start_async(self)
  prepare_io
  @runner_state = :started
  self
end

#state(object) ⇒ Object

Internal helpers



184
185
186
# File 'lib/rubysh/runner.rb', line 184

def state(object)
  @state[object] ||= {}
end

#target_state(target_name, reading = nil) ⇒ Object

Internal helpers



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/rubysh/runner.rb', line 189

def target_state(target_name, reading=nil)
  case target_name
  when Symbol
    target_state = @targets[target_name]
    raise Rubysh::Error::BaseError.new("Invalid target symbol: #{target_name.inspect} (valid target symbols are: #{@targets.keys.inspect})") unless target_state
  when Fixnum
    targets = targets_by_fd_numbers
    target_state = targets[target_name]
    raise Rubysh::Error::BaseError.new("Invalid target fd number: #{target_name.inspect} (valid target fd numbers are: #{targets.keys.inspect}})") unless target_state
  else
    raise Rubysh::Error::BaseError.new("Invalid type for target name: #{target_name.inspect} (#{target_name.class}). Valid types are Symbol and Fixnum.")
  end

  if reading.nil?
    # No checking
  elsif target_state[:target_reading?] && !reading
    raise Rubysh::Error::BaseError.new("Trying to write to read pipe #{target_name}")
  elsif !target_state[:target_reading?] && reading
    raise Rubysh::Error::BaseError.new("Trying to read from write pipe #{target_name}")
  end

  target_state
end

#to_sObject



162
163
164
# File 'lib/rubysh/runner.rb', line 162

def to_s
  inspect
end

#waitObject



222
223
224
225
# File 'lib/rubysh/runner.rb', line 222

def wait
  run_io
  do_wait
end

#write(bytes, target = 0) ⇒ Object



17
18
19
20
21
22
# File 'lib/rubysh/runner.rb', line 17

def write(bytes, target=0)
  raise Rubysh::Error::AlreadyRunError.new("Can only write to a runner in runner_state :started, not #{@runner_state.inspect}") unless @runner_state == :started
  state = target_state(target, false)
  target_name = state[:target_name]
  @parallel_io.write(target_name, bytes, false)
end

#writersObject



152
153
154
155
156
157
158
159
160
# File 'lib/rubysh/runner.rb', line 152

def writers
  writers = {}
  @targets.each do |target_name, target_state|
    next if target_state[:target_reading?]
    target = target_state[:target]
    writers[target] = target_name
  end
  writers
end