Class: Distributor::Client
- Inherits:
-
Object
- Object
- Distributor::Client
- Defined in:
- lib/distributor/client.rb
Instance Method Summary collapse
- #command(command, data = {}) ⇒ Object
- #hookup(ch, input, output = input) ⇒ Object
-
#initialize(input, output = input) ⇒ Client
constructor
A new instance of Client.
- #on_close(ch, &blk) ⇒ Object
- #on_command(&blk) ⇒ Object
- #on_hello(&blk) ⇒ Object
- #output(ch, data) ⇒ Object
- #run(command, &handler) ⇒ Object
- #socket(path, &handler) ⇒ Object
- #start ⇒ Object
- #tunnel(port, &handler) ⇒ Object
Constructor Details
#initialize(input, output = input) ⇒ Client
Returns a new instance of Client.
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/distributor/client.rb', line 9 def initialize(input, output=input) @connector = Distributor::Connector.new @multiplexer = Distributor::Multiplexer.new(output) @handlers = {} @processes = [] @on_close = Hash.new { |hash,key| hash[key] = Array.new } @on_hello = [] @on_command = Proc.new {} @hookup_lock = Mutex.new # reserve a command channel @multiplexer.reserve(0) # feed data from the input channel into the multiplexer @connector.handle(input) do |io| @multiplexer.input io end @connector.on_close(input) do |io| exit 0 end # handle the command channel of the multiplexer @connector.handle(@multiplexer.reader(0)) do |io| append_json(io.readpartial(4096)) dequeue_json do |data| case command = data["command"] when "hello" then @on_hello.each { |c| c.call } when "close" then ch = data["ch"] @on_close[ch].each { |c| c.call(ch) } when "ack" then ch = data["ch"] @multiplexer.reserve ch @handlers[data["id"]].call(ch) @handlers.delete(data["id"]) @processes << ch else @on_command.call(command, data) end end end end |
Instance Method Details
#command(command, data = {}) ⇒ Object
59 60 61 62 63 64 |
# File 'lib/distributor/client.rb', line 59 def command(command, data={}) data["id"] ||= @multiplexer.generate_id data["command"] = command @multiplexer.output 0, Distributor::OkJson.encode(data) data["id"] end |
#hookup(ch, input, output = input) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/distributor/client.rb', line 81 def hookup(ch, input, output=input) @hookup_lock.synchronize do # handle data incoming on the multiplexer @connector.handle(@multiplexer.reader(ch)) do |io| begin data = io.readpartial(4096) # output.write "#{ch}: #{data}" output.write data output.flush rescue EOFError command "close", "ch" => ch end end end # handle data incoming from the input channel @connector.handle(input) do |io| begin data = io.readpartial(4096) @multiplexer.output ch, data rescue EOFError @on_close[ch].each { |c| c.call(ch) } @connector.close(io) end end end |
#on_close(ch, &blk) ⇒ Object
108 109 110 |
# File 'lib/distributor/client.rb', line 108 def on_close(ch, &blk) @on_close[ch] << blk end |
#on_command(&blk) ⇒ Object
116 117 118 |
# File 'lib/distributor/client.rb', line 116 def on_command(&blk) @on_command = blk end |
#on_hello(&blk) ⇒ Object
112 113 114 |
# File 'lib/distributor/client.rb', line 112 def on_hello(&blk) @on_hello << blk end |
#output(ch, data) ⇒ Object
55 56 57 |
# File 'lib/distributor/client.rb', line 55 def output(ch, data) @multiplexer.output ch, data end |
#run(command, &handler) ⇒ Object
66 67 68 69 |
# File 'lib/distributor/client.rb', line 66 def run(command, &handler) id = command("run", "args" => command) @handlers[id] = handler end |
#socket(path, &handler) ⇒ Object
71 72 73 74 |
# File 'lib/distributor/client.rb', line 71 def socket(path, &handler) id = command("socket", "path" => path) @handlers[id] = handler end |
#start ⇒ Object
120 121 122 |
# File 'lib/distributor/client.rb', line 120 def start loop { @connector.listen } end |
#tunnel(port, &handler) ⇒ Object
76 77 78 79 |
# File 'lib/distributor/client.rb', line 76 def tunnel(port, &handler) id = command("tunnel", "port" => port) @handlers[id] = handler end |