Class: ObjectChannel
- Inherits:
-
Object
- Object
- ObjectChannel
- Defined in:
- lib/object-channel.rb,
lib/object-channel/version.rb
Overview
require “object-channel/version”
Constant Summary collapse
- VERSION =
"0.1.5"
Class Attribute Summary collapse
Instance Attribute Summary collapse
-
#pid ⇒ Object
readonly
Returns the value of attribute pid.
Class Method Summary collapse
-
.create(endpoints = 2) ⇒ Object
Create object channels endpoints, each connected by an object channel.
-
.fork(name = nil, &block) ⇒ Object
Creates a subprocess with a bi-directional ObjectChannel available to both processes, behaving otherwise like Process.fork.
-
.new(receive_pipe, transmit_pipe) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#close(ios_name) ⇒ Object
Close an IO by name.
-
#close! ⇒ Object
Close all ends of all IOS.
-
#closed? ⇒ Boolean
Test to see if the receive_reader is at eof in a non-blocking manner.
-
#ready? ⇒ Boolean
Test to see if there is anything to receive.
-
#receive ⇒ Object
Receive the next object in a non-blocking manner Returns nil if no object was present.
-
#receive! ⇒ Object
Receive the next object in a blocking manner.
-
#transmit(object) ⇒ Object
Transmit an object.
Class Attribute Details
.forker ⇒ Object
110 111 112 |
# File 'lib/object-channel.rb', line 110 def forker (@forker || ::Kernel) end |
Instance Attribute Details
#pid ⇒ Object (readonly)
Returns the value of attribute pid.
115 116 117 |
# File 'lib/object-channel.rb', line 115 def pid @pid end |
Class Method Details
.create(endpoints = 2) ⇒ Object
Create object channels endpoints, each connected by an object channel.
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/object-channel.rb', line 8 def create(endpoints=2) endpoints.times.to_a.map do |index| read, write = IO.pipe write.sync = true {:index=>index, :pipe=>[read, write]} end.permutation(2).to_a.inject(Hash.new{|h,k|h[k]=Hash.new}) do |channels,pipes| from = pipes[0] to = pipes[1] channels[from[:index]][to[:index]] = ObjectChannel.new( from[:pipe], to[:pipe] ) channels end end |
.fork(name = nil, &block) ⇒ Object
Creates a subprocess with a bi-directional ObjectChannel available to both processes, behaving otherwise like Process.fork.
If a block is specified, the block is run in the subprocess with the ObjectChannel::Pair to its parent as its only argument. Like Process.fork(), the subprocess terminates with a status of zero.
Otherwise, the fork call returns twice, once in the parent, returning the ObjectChannel to its child whose pid method is the process ID of that child, and once in the child, returning the ObjectChannel to the parent, whose pid method returns nil.
:call-seq:
ObjectChannel.fork -> ObjectChannel x 2
ObjectChannel.fork{|channel|} -> ObjectChannel
Example (Block)
p2c = ObjectChannel.fork do |c2p|
c2p.transmit :foo => :bar
puts "child received <#{p2c.receive!.inspect}>" #=> [1, 2, 3]
end
puts "parent received <#{p2c.receive!.inspect}>" #=> {:foo=>:bar}
channel.transmit [1,2,3]
Example (No Block)
if (channel = ObjectChannel.fork).pid
channel.transmit :foo => :bar
puts "child received <#{channel.receive!.inspect}>" #=> [1, 2, 3]
else
puts "received <#{channel.receive!.inspect}>" #=> {:foo=>:bar}
channel.transmit [1,2,3]
exit
end
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/object-channel.rb', line 72 def fork name=nil, &block channels = create(2) channel_a = channels[0][1] channel_b = channels[1][0] if block_given? ret = fork name if ret.nil? or (ret.respond_to?(:pid) and ret.pid.nil?) begin block.call(*(ret.respond_to?(:pid) ? [ret] : [] ) ) ensure exit! end end return ret else pid = forker.fork channel = ( pid.nil? ? channel_a : channel_b) if name.nil? channel.instance_variable_set(:@pid, pid) channel else Object.const_set name, channel pid end end end |
.new(receive_pipe, transmit_pipe) ⇒ Object
:nodoc:
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/object-channel.rb', line 22 def new(receive_pipe,transmit_pipe) object_channel = allocate [ :@receive_reader, :@receive_writer, :@transmit_reader, :@transmit_writer ].zip([receive_pipe,transmit_pipe].flatten(1)) do |name,ios| object_channel.instance_variable_set(name,ios) end object_channel end |
Instance Method Details
#close(ios_name) ⇒ Object
Close an IO by name
189 190 191 192 |
# File 'lib/object-channel.rb', line 189 def close(ios_name) ios = instance_variable_get(ios_name) ios.close unless ios.closed? end |
#close! ⇒ Object
Close all ends of all IOS.
177 178 179 180 181 182 183 184 |
# File 'lib/object-channel.rb', line 177 def close! [ :@receive_reader, :@receive_writer, :@transmit_reader, :@transmit_writer ].each &method(:close) end |
#closed? ⇒ Boolean
Test to see if the receive_reader is at eof in a non-blocking manner
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/object-channel.rb', line 163 def closed? c = Thread.start{@receive_reader.eof?} Thread.pass if c.status == false and c.value return true else c.kill return false end end |
#ready? ⇒ Boolean
Test to see if there is anything to receive
148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/object-channel.rb', line 148 def ready? c = Thread.start{@receive_reader.getc} Thread.pass if c.status == false and c.value @receive_reader.ungetc(c.value) return true else c.kill return false end end |
#receive ⇒ Object
Receive the next object in a non-blocking manner Returns nil if no object was present
141 142 143 |
# File 'lib/object-channel.rb', line 141 def receive receive! if ready? end |
#receive! ⇒ Object
Receive the next object in a blocking manner
128 129 130 131 132 133 134 135 136 |
# File 'lib/object-channel.rb', line 128 def receive! close(:@receive_writer) begin r = @receive_reader.readline Marshal.load(r.unpack("m")[0]) rescue EOFError return nil end end |
#transmit(object) ⇒ Object
Transmit an object
120 121 122 123 |
# File 'lib/object-channel.rb', line 120 def transmit(object) close(:@transmit_reader) @transmit_writer.write [Marshal.dump(object)].pack('m').gsub("\n",'') + "\n" end |