Class: Levdon::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/levdon.rb

Overview

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ Worker

Returns a new instance of Worker.



129
130
131
132
133
134
# File 'lib/levdon.rb', line 129

def initialize(&block)
  @child_read, @parent_write = create_pipe
  @parent_read, @child_write = create_pipe
  @block = block
  @io_stream = NonBlockLineStream.new(@parent_read,@parent_write)
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



127
128
129
# File 'lib/levdon.rb', line 127

def pid
  @pid
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


174
175
176
177
178
179
# File 'lib/levdon.rb', line 174

def alive?
  Process.kill(0, @pid)
  true
rescue Errno::ESRCH
  false
end

#async_execute(*msg) ⇒ Object



164
165
166
# File 'lib/levdon.rb', line 164

def async_execute(*msg)
  nonblock_write_to_child(msg)
end

#create_pipeObject



136
137
138
# File 'lib/levdon.rb', line 136

def create_pipe
  IO.pipe.map{|pipe| pipe.tap{|_| _.set_encoding("ASCII-8BIT", "ASCII-8BIT") } }
end

#execute(*msg) ⇒ Object



159
160
161
162
# File 'lib/levdon.rb', line 159

def execute(*msg)
  write_to_child(msg)
  Thread.new { read_from_child }
end

#install_exit_handlerObject



238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/levdon.rb', line 238

def install_exit_handler
  at_exit do
    next unless alive?
    begin
      Process.kill("KILL", @pid)
      Process.wait(@pid)
    rescue Errno::ESRCH
      # noop
    rescue => e
      puts "error at_exit: #{ e }"
      raise e
    end
  end
end

#install_signal_handlerObject



253
254
255
256
257
258
259
260
261
# File 'lib/levdon.rb', line 253

def install_signal_handler
  [:INT, :QUIT].each do |signal|
    old_handler = Signal.trap(signal) {
      Process.kill(signal, @pid)
      Process.wait(@pid)
      old_handler.call
    }
  end
end

#nonblock_read_from_childObject



195
196
197
198
199
200
201
# File 'lib/levdon.rb', line 195

def nonblock_read_from_child()
  data = @io_stream.read
  if(data)
    return Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n"))
  end
  return nil
end

#nonblock_write_to_child(obj) ⇒ Object



203
204
205
206
# File 'lib/levdon.rb', line 203

def nonblock_write_to_child(obj)
  data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n"
  @io_stream.write(data)
end

#pollObject



181
182
183
# File 'lib/levdon.rb', line 181

def poll
  @io_stream.poll
end

#read_from_childObject



208
209
210
# File 'lib/levdon.rb', line 208

def read_from_child
  read_object(@parent_read)
end

#read_from_parentObject



216
217
218
# File 'lib/levdon.rb', line 216

def read_from_parent
  read_object(@child_read)
end

#read_object(read) ⇒ Object



190
191
192
193
# File 'lib/levdon.rb', line 190

def read_object(read)
  data = read.gets
  Marshal.load(data.chomp.gsub("@NDELIMITER@", "\n"))
end

#runObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/levdon.rb', line 140

def run
  @pid = fork do
    @parent_read.close
    @parent_write.close
    write_to_parent(:ready)
    loop do
      args = read_from_parent
      break if args == :stop 
      result = @block.call(*args)
      write_object(result, @child_write)
    end

    @child_read.close
    @child_write.close
  end

  wait_after_fork if @pid
end

#stopObject



168
169
170
171
172
# File 'lib/levdon.rb', line 168

def stop
  return unless alive?
  write_to_child(:stop)
  Process.wait(@pid)
end

#wait_after_forkObject



224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/levdon.rb', line 224

def wait_after_fork
  @child_read.close
  @child_write.close

  install_exit_handler
  install_signal_handler
  
  Thread.new {
    result = read_from_child
    raise "Failed to start worker pid #{ @pid }" unless result == :ready
    result
  }
end

#write_object(obj, write) ⇒ Object



185
186
187
188
# File 'lib/levdon.rb', line 185

def write_object(obj, write)
  data = Marshal.dump(obj).gsub("\n", "@NDELIMITER@") + "\n"
  write.write data
end

#write_to_child(obj) ⇒ Object



212
213
214
# File 'lib/levdon.rb', line 212

def write_to_child(obj)
  write_object(obj, @parent_write)
end

#write_to_parent(obj) ⇒ Object



220
221
222
# File 'lib/levdon.rb', line 220

def write_to_parent(obj)
  write_object(obj, @child_write)
end