Class: Distributor::Server
- Inherits:
-
Object
- Object
- Distributor::Server
- Defined in:
- lib/distributor/server.rb
Instance Method Summary collapse
- #ack(id, options = {}) ⇒ Object
- #command(command, data = {}) ⇒ Object
- #handle_socket(ch, socket) ⇒ Object
-
#initialize(input, output = input) ⇒ Server
constructor
A new instance of Server.
- #on_command(&blk) ⇒ Object
- #output(ch, data = {}) ⇒ Object
- #run(command) ⇒ Object
- #socket(path) ⇒ Object
- #start ⇒ Object
- #tunnel(port) ⇒ Object
Constructor Details
#initialize(input, output = input) ⇒ Server
Returns a new instance of Server.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/distributor/server.rb', line 10 def initialize(input, output=input) @connector = Distributor::Connector.new @multiplexer = Distributor::Multiplexer.new(output) @on_command = Proc.new {} # reserve a command channel @multiplexer.reserve(0) # feed data from the input channel into the multiplexer @connector.handle(input) do |io| @multiplexer.input io end @connector.on_close(input) do |io| exit 0 end # handle the command channel of the multiplexer @connector.handle(@multiplexer.reader(0)) do |io| append_json(io.readpartial(4096)) dequeue_json do |data| case command = data["command"] when "socket" then path = data["path"] ch = socket(path) ack data["id"], "ch" => ch, "path" => path when "tunnel" then port = (data["port"] || ENV["PORT"] || 5000).to_i ch = tunnel(port) ack data["id"], "ch" => ch, "port" => port when "close" then @multiplexer.close data["ch"] when "run" then ch = run(data["args"]) @multiplexer.output 0, Distributor::OkJson.encode({ "id" => data["id"], "command" => "ack", "ch" => ch }) else @on_command.call command, data end end end end |
Instance Method Details
#ack(id, options = {}) ⇒ Object
53 54 55 |
# File 'lib/distributor/server.rb', line 53 def ack(id, ={}) @multiplexer.output 0, Distributor::OkJson.encode(.merge({ "id" => id, "command" => "ack" })) end |
#command(command, data = {}) ⇒ Object
111 112 113 114 115 116 |
# File 'lib/distributor/server.rb', line 111 def command(command, data={}) data["id"] ||= @multiplexer.generate_id data["command"] = command @multiplexer.output 0, Distributor::OkJson.encode(data) data["id"] end |
#handle_socket(ch, socket) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/distributor/server.rb', line 81 def handle_socket(ch, socket) # handle data incoming from process @connector.handle(socket) do |io| begin @multiplexer.output(ch, io.readpartial(4096)) rescue EOFError @multiplexer.close(ch) @connector.close(io) end end # handle data incoming on the multiplexer @connector.handle(@multiplexer.reader(ch)) do |input_io| data = input_io.readpartial(4096) socket.write data end ch end |
#on_command(&blk) ⇒ Object
122 123 124 |
# File 'lib/distributor/server.rb', line 122 def on_command(&blk) @on_command = blk end |
#output(ch, data = {}) ⇒ Object
118 119 120 |
# File 'lib/distributor/server.rb', line 118 def output(ch, data={}) @multiplexer.output ch, Distributor::OkJson.encode(data) end |
#run(command) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/distributor/server.rb', line 57 def run(command) ch = @multiplexer.reserve pipe = IO.popen(command, "w+") # handle data incoming from process @connector.handle(pipe) do |io| begin @multiplexer.output(ch, io.readpartial(4096)) rescue EOFError @multiplexer.close(ch) @connector.close(io) end end # handle data incoming on the multiplexer @connector.handle(@multiplexer.reader(ch)) do |input_io| data = input_io.readpartial(4096) pipe.write data end ch end |
#socket(path) ⇒ Object
101 102 103 104 |
# File 'lib/distributor/server.rb', line 101 def socket(path) ch = @multiplexer.reserve handle_socket ch, UNIXSocket.new(path) end |
#start ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/distributor/server.rb', line 126 def start @multiplexer.output 0, Distributor::OkJson.encode({ "command" => "hello" }) loop do begin @connector.listen rescue Exception => ex @multiplexer.output 0, Distributor::OkJson.encode({ "command" => "error", "message" => ex., "backtrace" => ex.backtrace.first }) end end end |
#tunnel(port) ⇒ Object
106 107 108 109 |
# File 'lib/distributor/server.rb', line 106 def tunnel(port) ch = @multiplexer.reserve handle_socket ch, TCPSocket.new("localhost", port) end |