Class: Distributor::Multiplexer

Inherits:
Object
  • Object
show all
Defined in:
lib/distributor/multiplexer.rb

Instance Method Summary collapse

Constructor Details

#initialize(output) ⇒ Multiplexer

Returns a new instance of Multiplexer.



8
9
10
11
12
13
14
15
# File 'lib/distributor/multiplexer.rb', line 8

def initialize(output)
  @output  = output
  @readers = {}
  @writers = {}
  @write_lock = Mutex.new

  @output.sync = true
end

Instance Method Details

#close(ch) ⇒ Object



47
48
49
50
# File 'lib/distributor/multiplexer.rb', line 47

def close(ch)
  output 0, Distributor::OkJson.encode({ "command" => "close", "ch" => ch })
rescue IOError
end

#generate_idObject



52
53
54
# File 'lib/distributor/multiplexer.rb', line 52

def generate_id
  id = "#{Time.now.to_f}-#{rand(10000)}"
end

#input(io) ⇒ Object



32
33
34
35
36
37
38
# File 'lib/distributor/multiplexer.rb', line 32

def input(io)
  ch, data = Distributor::Packet.parse(io)
  return if ch.nil?
  writer(ch).write data
rescue IOError
  output 0, Distributor::OkJson.encode({ "command" => "close", "ch" => ch })
end

#output(ch, data) ⇒ Object



40
41
42
43
44
45
# File 'lib/distributor/multiplexer.rb', line 40

def output(ch, data)
  @write_lock.synchronize do
    Distributor::Packet.write(@output, ch, data)
  end
rescue Errno::EPIPE
end

#reader(ch) ⇒ Object



24
25
26
# File 'lib/distributor/multiplexer.rb', line 24

def reader(ch)
  @readers[ch] || raise("no such channel: #{ch}")
end

#reserve(ch = nil) ⇒ Object



17
18
19
20
21
22
# File 'lib/distributor/multiplexer.rb', line 17

def reserve(ch=nil)
  ch ||= @readers.keys.length
  raise "channel already taken: #{ch}" if @readers.has_key?(ch)
  @readers[ch], @writers[ch] = IO.pipe
  ch
end

#writer(ch) ⇒ Object



28
29
30
# File 'lib/distributor/multiplexer.rb', line 28

def writer(ch)
  @writers[ch] || raise("no such channel: #{ch}")
end