Class: ObjectChannel

Inherits:
Object
  • Object
show all
Defined in:
lib/object-channel.rb,
lib/object-channel/version.rb

Overview

require “object-channel/version”

Constant Summary collapse

VERSION =
"0.1.5"

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.forkerObject



110
111
112
# File 'lib/object-channel.rb', line 110

def forker
  (@forker || ::Kernel)
end

Instance Attribute Details

#pidObject (readonly)

Returns the value of attribute pid.



115
116
117
# File 'lib/object-channel.rb', line 115

def pid
  @pid
end

Class Method Details

.create(endpoints = 2) ⇒ Object

Create object channels endpoints, each connected by an object channel.



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/object-channel.rb', line 8

def create(endpoints=2)
  endpoints.times.to_a.map do |index|
    read, write = IO.pipe
    write.sync = true
    {:index=>index, :pipe=>[read, write]}
  end.permutation(2).to_a.inject(Hash.new{|h,k|h[k]=Hash.new}) do |channels,pipes|
    from = pipes[0]
    to = pipes[1]
    channels[from[:index]][to[:index]] = ObjectChannel.new( from[:pipe], to[:pipe] )
    channels
  end
end

.fork(name = nil, &block) ⇒ Object

Creates a subprocess with a bi-directional ObjectChannel available to both processes, behaving otherwise like Process.fork.

If a block is specified, the block is run in the subprocess with the ObjectChannel::Pair to its parent as its only argument. Like Process.fork(), the subprocess terminates with a status of zero.

Otherwise, the fork call returns twice, once in the parent, returning the ObjectChannel to its child whose pid method is the process ID of that child, and once in the child, returning the ObjectChannel to the parent, whose pid method returns nil.

:call-seq:

ObjectChannel.fork -> ObjectChannel x 2
ObjectChannel.fork{|channel|} -> ObjectChannel

Example (Block)

p2c = ObjectChannel.fork do |c2p|
  c2p.transmit :foo => :bar
  puts "child received <#{p2c.receive!.inspect}>" #=> [1, 2, 3]
end
puts "parent received <#{p2c.receive!.inspect}>" #=> {:foo=>:bar}
channel.transmit [1,2,3]

Example (No Block)

if (channel = ObjectChannel.fork).pid
  channel.transmit :foo => :bar
  puts "child received <#{channel.receive!.inspect}>" #=> [1, 2, 3]
else
  puts "received <#{channel.receive!.inspect}>" #=> {:foo=>:bar}
  channel.transmit [1,2,3]
  exit
end


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/object-channel.rb', line 72

def fork name=nil, &block
  channels = create(2)
  channel_a = channels[0][1]
  channel_b = channels[1][0]

  if block_given?
    ret = fork name
    if ret.nil? or (ret.respond_to?(:pid) and ret.pid.nil?)
      begin
        block.call(*(ret.respond_to?(:pid) ? [ret] : [] ) )
      ensure
        exit!
      end
    end
    return ret
  else
    pid = forker.fork
    channel = ( pid.nil? ? channel_a : channel_b)
    if name.nil?
      channel.instance_variable_set(:@pid, pid)
      channel
    else
      Object.const_set name, channel
      pid
    end
  end
end

.new(receive_pipe, transmit_pipe) ⇒ Object

:nodoc:



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/object-channel.rb', line 22

def new(receive_pipe,transmit_pipe)
  object_channel = allocate
  [
  :@receive_reader,
  :@receive_writer,
  :@transmit_reader,
  :@transmit_writer
  ].zip([receive_pipe,transmit_pipe].flatten(1)) do |name,ios|
    object_channel.instance_variable_set(name,ios)
  end
  object_channel
end

Instance Method Details

#close(ios_name) ⇒ Object

Close an IO by name



189
190
191
192
# File 'lib/object-channel.rb', line 189

def close(ios_name)
  ios = instance_variable_get(ios_name)
  ios.close unless ios.closed?
end

#close!Object

Close all ends of all IOS.



177
178
179
180
181
182
183
184
# File 'lib/object-channel.rb', line 177

def close!
  [
    :@receive_reader,
    :@receive_writer,
    :@transmit_reader,
    :@transmit_writer
  ].each &method(:close)
end

#closed?Boolean

Test to see if the receive_reader is at eof in a non-blocking manner

Returns:

  • (Boolean)


163
164
165
166
167
168
169
170
171
172
# File 'lib/object-channel.rb', line 163

def closed?
  c = Thread.start{@receive_reader.eof?}
  Thread.pass
  if c.status == false and c.value
    return true
  else
    c.kill
    return false
  end
end

#ready?Boolean

Test to see if there is anything to receive

Returns:

  • (Boolean)


148
149
150
151
152
153
154
155
156
157
158
# File 'lib/object-channel.rb', line 148

def ready?
  c = Thread.start{@receive_reader.getc}
  Thread.pass
  if c.status == false and c.value
    @receive_reader.ungetc(c.value)
    return true
  else
    c.kill
    return false
  end
end

#receiveObject

Receive the next object in a non-blocking manner Returns nil if no object was present



141
142
143
# File 'lib/object-channel.rb', line 141

def receive
  receive! if ready?
end

#receive!Object

Receive the next object in a blocking manner



128
129
130
131
132
133
134
135
136
# File 'lib/object-channel.rb', line 128

def receive!
  close(:@receive_writer)
  begin
    r = @receive_reader.readline
    Marshal.load(r.unpack("m")[0])
  rescue EOFError
    return nil
  end
end

#transmit(object) ⇒ Object

Transmit an object



120
121
122
123
# File 'lib/object-channel.rb', line 120

def transmit(object)
  close(:@transmit_reader)
  @transmit_writer.write [Marshal.dump(object)].pack('m').gsub("\n",'') + "\n"
end