Class: Rubysh::Subprocess::ParallelIO

Inherits:
Object
  • Object
show all
Defined in:
lib/rubysh/subprocess/parallel_io.rb

Direct Known Subclasses

PidAwareParallelIO

Defined Under Namespace

Modules: EOF Classes: NothingAvailable

Instance Method Summary collapse

Constructor Details

#initialize(readers, writers) ⇒ ParallelIO

readers/writers should be hashes mapping => name



7
8
9
10
11
12
13
14
15
16
# File 'lib/rubysh/subprocess/parallel_io.rb', line 7

def initialize(readers, writers)
  @finished_readers = Set.new
  @on_read = nil
  @readers = readers

  @writers = writers
  @finished_writers = Set.new
  @on_write = nil
  @writer_buffers = {}
end

Instance Method Details

#available_readersObject



62
63
64
65
# File 'lib/rubysh/subprocess/parallel_io.rb', line 62

def available_readers
  potential = @readers.keys - @finished_readers.to_a
  potential.select {|reader| !reader.closed?}
end

#available_writersObject

Writers with a non-zero number of bytes remaining to write



68
69
70
71
# File 'lib/rubysh/subprocess/parallel_io.rb', line 68

def available_writers
  potential = @writer_buffers.keys - @finished_writers.to_a
  potential.select {|writer| !writer.closed? && get_data(writer).length > 0}
end

#close(writer_name) ⇒ Object



57
58
59
60
# File 'lib/rubysh/subprocess/parallel_io.rb', line 57

def close(writer_name)
  writer = writer_by_name(writer_name)
  writer.close
end

#consume_all_availableObject



107
108
109
110
111
112
# File 'lib/rubysh/subprocess/parallel_io.rb', line 107

def consume_all_available
  begin
    loop {run_select_loop(0)}
  rescue NothingAvailable
  end
end

#on_read(method = nil, &blk) ⇒ Object



26
27
28
29
# File 'lib/rubysh/subprocess/parallel_io.rb', line 26

def on_read(method=nil, &blk)
  raise "Can't provide both method and block" if method && blk
  @on_read = method || blk
end

#on_write(method = nil, &blk) ⇒ Object



31
32
33
34
# File 'lib/rubysh/subprocess/parallel_io.rb', line 31

def on_write(method=nil, &blk)
  raise "Can't provide both method and block" if method && blk
  @on_write = method || blk
end

#read_available(reader) ⇒ Object



114
115
116
117
118
119
120
121
122
123
# File 'lib/rubysh/subprocess/parallel_io.rb', line 114

def read_available(reader)
  begin
    data = reader.read_nonblock(4096)
  rescue EOFError, Errno::EPIPE
    finalize_reader(reader)
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
  else
    issue_reader_callback(reader, data)
  end
end

#register_reader(reader, name) ⇒ Object



18
19
20
# File 'lib/rubysh/subprocess/parallel_io.rb', line 18

def register_reader(reader, name)
  @readers[reader] = name
end

#register_writer(writer, name) ⇒ Object



22
23
24
# File 'lib/rubysh/subprocess/parallel_io.rb', line 22

def register_writer(writer, name)
  @writers[writer] = name
end

#runObject



73
74
75
76
77
# File 'lib/rubysh/subprocess/parallel_io.rb', line 73

def run
  while available_writers.length > 0 || available_readers.length > 0
    run_once
  end
end

#run_once(timeout = nil) ⇒ Object

This method is a stub so it can be extended in subclasses



80
81
82
# File 'lib/rubysh/subprocess/parallel_io.rb', line 80

def run_once(timeout=nil)
  run_select_loop(timeout)
end

#run_select_loop(timeout) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/rubysh/subprocess/parallel_io.rb', line 84

def run_select_loop(timeout)
  potential_readers = available_readers
  potential_writers = available_writers

  begin
    selected = IO.select(potential_readers, potential_writers, nil, timeout)
  rescue Errno::EINTR
    retry
  else
    raise NothingAvailable unless selected
  end

  ready_readers, ready_writers, _ = selected

  ready_readers.each do |reader|
    read_available(reader)
  end

  ready_writers.each do |writer|
    write_available(writer)
  end
end

#write(writer_name, data, close_on_complete = true) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/rubysh/subprocess/parallel_io.rb', line 36

def write(writer_name, data, close_on_complete=true)
  writer = writer_by_name(writer_name)
  buffer_state = @writer_buffers[writer] ||= {
    :data => '',
    :close_on_complete => nil
  }

  if buffer_state[:close_on_complete]
    raise Rubysh::Error::AlreadyClosedError.new("You have already marked #{writer.inspect} as close_on_complete; can't write more data")
  end

  buffer_state[:close_on_complete] = close_on_complete
  # XXX: unnecessary copy here
  buffer_state[:data] += data

  # Note that this leads to a bit of weird semantics if you try
  # doing a write('') from within an on_write handler, since it'll
  # call this synchronously. May want to change at some point.
  finalize_writer_if_done(writer)
end