Class: Distributor::Multiplexer
- Inherits:
-
Object
- Object
- Distributor::Multiplexer
- Defined in:
- lib/distributor/multiplexer.rb
Instance Method Summary collapse
- #close(ch) ⇒ Object
- #generate_id ⇒ Object
-
#initialize(output) ⇒ Multiplexer
constructor
A new instance of Multiplexer.
- #input(io) ⇒ Object
- #output(ch, data) ⇒ Object
- #reader(ch) ⇒ Object
- #reserve(ch = nil) ⇒ Object
- #writer(ch) ⇒ Object
Constructor Details
#initialize(output) ⇒ Multiplexer
Returns a new instance of Multiplexer.
8 9 10 11 12 13 14 15 |
# File 'lib/distributor/multiplexer.rb', line 8 def initialize(output) @output = output @readers = {} @writers = {} @write_lock = Mutex.new @output.sync = true end |
Instance Method Details
#close(ch) ⇒ Object
47 48 49 50 |
# File 'lib/distributor/multiplexer.rb', line 47 def close(ch) output 0, Distributor::OkJson.encode({ "command" => "close", "ch" => ch }) rescue IOError end |
#generate_id ⇒ Object
52 53 54 |
# File 'lib/distributor/multiplexer.rb', line 52 def generate_id id = "#{Time.now.to_f}-#{rand(10000)}" end |
#input(io) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/distributor/multiplexer.rb', line 32 def input(io) ch, data = Distributor::Packet.parse(io) return if ch.nil? writer(ch).write data rescue IOError output 0, Distributor::OkJson.encode({ "command" => "close", "ch" => ch }) end |
#output(ch, data) ⇒ Object
40 41 42 43 44 45 |
# File 'lib/distributor/multiplexer.rb', line 40 def output(ch, data) @write_lock.synchronize do Distributor::Packet.write(@output, ch, data) end rescue Errno::EPIPE end |
#reader(ch) ⇒ Object
24 25 26 |
# File 'lib/distributor/multiplexer.rb', line 24 def reader(ch) @readers[ch] || raise("no such channel: #{ch}") end |
#reserve(ch = nil) ⇒ Object
17 18 19 20 21 22 |
# File 'lib/distributor/multiplexer.rb', line 17 def reserve(ch=nil) ch ||= @readers.keys.length raise "channel already taken: #{ch}" if @readers.has_key?(ch) @readers[ch], @writers[ch] = IO.pipe ch end |
#writer(ch) ⇒ Object
28 29 30 |
# File 'lib/distributor/multiplexer.rb', line 28 def writer(ch) @writers[ch] || raise("no such channel: #{ch}") end |