Class: Rubysh::Subprocess::ParallelIO
- Inherits:
-
Object
- Object
- Rubysh::Subprocess::ParallelIO
show all
- Defined in:
- lib/rubysh/subprocess/parallel_io.rb
Defined Under Namespace
Modules: EOF
Classes: NothingAvailable
Instance Method Summary
collapse
Constructor Details
#initialize(readers, writers) ⇒ ParallelIO
readers/writers should be hashes mapping => name
7
8
9
10
11
12
13
14
15
16
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 7
def initialize(readers, writers)
@finished_readers = Set.new
@on_read = nil
@readers = readers
@writers = writers
@finished_writers = Set.new
@on_write = nil
@writer_buffers = {}
end
|
Instance Method Details
#available_readers ⇒ Object
62
63
64
65
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 62
def available_readers
potential = @readers.keys - @finished_readers.to_a
potential.select {|reader| !reader.closed?}
end
|
#available_writers ⇒ Object
Writers with a non-zero number of bytes remaining to write
68
69
70
71
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 68
def available_writers
potential = @writer_buffers.keys - @finished_writers.to_a
potential.select {|writer| !writer.closed? && get_data(writer).length > 0}
end
|
#close(writer_name) ⇒ Object
57
58
59
60
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 57
def close(writer_name)
writer = writer_by_name(writer_name)
writer.close
end
|
#consume_all_available ⇒ Object
107
108
109
110
111
112
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 107
def consume_all_available
begin
loop {run_select_loop(0)}
rescue NothingAvailable
end
end
|
#on_read(method = nil, &blk) ⇒ Object
26
27
28
29
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 26
def on_read(method=nil, &blk)
raise "Can't provide both method and block" if method && blk
@on_read = method || blk
end
|
#on_write(method = nil, &blk) ⇒ Object
31
32
33
34
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 31
def on_write(method=nil, &blk)
raise "Can't provide both method and block" if method && blk
@on_write = method || blk
end
|
#read_available(reader) ⇒ Object
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 114
def read_available(reader)
begin
data = reader.read_nonblock(4096)
rescue EOFError, Errno::EPIPE
finalize_reader(reader)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
else
issue_reader_callback(reader, data)
end
end
|
#register_reader(reader, name) ⇒ Object
18
19
20
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 18
def register_reader(reader, name)
@readers[reader] = name
end
|
#register_writer(writer, name) ⇒ Object
22
23
24
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 22
def register_writer(writer, name)
@writers[writer] = name
end
|
#run ⇒ Object
73
74
75
76
77
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 73
def run
while available_writers.length > 0 || available_readers.length > 0
run_once
end
end
|
#run_once(timeout = nil) ⇒ Object
This method is a stub so it can be extended in subclasses
80
81
82
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 80
def run_once(timeout=nil)
run_select_loop(timeout)
end
|
#run_select_loop(timeout) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 84
def run_select_loop(timeout)
potential_readers = available_readers
potential_writers = available_writers
begin
selected = IO.select(potential_readers, potential_writers, nil, timeout)
rescue Errno::EINTR
retry
else
raise NothingAvailable unless selected
end
ready_readers, ready_writers, _ = selected
ready_readers.each do |reader|
read_available(reader)
end
ready_writers.each do |writer|
write_available(writer)
end
end
|
#write(writer_name, data, close_on_complete = true) ⇒ Object
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/rubysh/subprocess/parallel_io.rb', line 36
def write(writer_name, data, close_on_complete=true)
writer = writer_by_name(writer_name)
buffer_state = @writer_buffers[writer] ||= {
:data => '',
:close_on_complete => nil
}
if buffer_state[:close_on_complete]
raise Rubysh::Error::AlreadyClosedError.new("You have already marked #{writer.inspect} as close_on_complete; can't write more data")
end
buffer_state[:close_on_complete] = close_on_complete
buffer_state[:data] += data
finalize_writer_if_done(writer)
end
|