Class: Distributor::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/distributor/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(input, output = input) ⇒ Server

Returns a new instance of Server.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/distributor/server.rb', line 10

def initialize(input, output=input)
  @connector   = Distributor::Connector.new
  @multiplexer = Distributor::Multiplexer.new(output)
  @on_command  = Proc.new {}

  # reserve a command channel
  @multiplexer.reserve(0)

  # feed data from the input channel into the multiplexer
  @connector.handle(input) do |io|
    @multiplexer.input io
  end

  @connector.on_close(input) do |io|
    exit 0
  end

  # handle the command channel of the multiplexer
  @connector.handle(@multiplexer.reader(0)) do |io|
    append_json(io.readpartial(4096))

    dequeue_json do |data|
      case command = data["command"]
      when "socket" then
        path = data["path"]
        ch = socket(path)
        ack data["id"], "ch" => ch, "path" => path
      when "tunnel" then
        port = (data["port"] || ENV["PORT"] || 5000).to_i
        ch = tunnel(port)
        ack data["id"], "ch" => ch, "port" => port
      when "close" then
        @multiplexer.close data["ch"]
      when "run" then
        ch = run(data["args"])
        @multiplexer.output 0, Distributor::OkJson.encode({ "id" => data["id"], "command" => "ack", "ch" => ch })
      else
        @on_command.call command, data
      end
    end
  end
end

Instance Method Details

#ack(id, options = {}) ⇒ Object



53
54
55
# File 'lib/distributor/server.rb', line 53

def ack(id, options={})
  @multiplexer.output 0, Distributor::OkJson.encode(options.merge({ "id" => id, "command" => "ack" }))
end

#command(command, data = {}) ⇒ Object



111
112
113
114
115
116
# File 'lib/distributor/server.rb', line 111

def command(command, data={})
  data["id"] ||= @multiplexer.generate_id
  data["command"] = command
  @multiplexer.output 0, Distributor::OkJson.encode(data)
  data["id"]
end

#handle_socket(ch, socket) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/distributor/server.rb', line 81

def handle_socket(ch, socket)
  # handle data incoming from process
  @connector.handle(socket) do |io|
    begin
      @multiplexer.output(ch, io.readpartial(4096))
    rescue EOFError
      @multiplexer.close(ch)
      @connector.close(io)
    end
  end

  # handle data incoming on the multiplexer
  @connector.handle(@multiplexer.reader(ch)) do |input_io|
    data = input_io.readpartial(4096)
    socket.write data
  end

  ch
end

#on_command(&blk) ⇒ Object



122
123
124
# File 'lib/distributor/server.rb', line 122

def on_command(&blk)
  @on_command = blk
end

#output(ch, data = {}) ⇒ Object



118
119
120
# File 'lib/distributor/server.rb', line 118

def output(ch, data={})
  @multiplexer.output ch, Distributor::OkJson.encode(data)
end

#run(command) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/distributor/server.rb', line 57

def run(command)
  ch = @multiplexer.reserve

  pipe = IO.popen(command, "w+")

  # handle data incoming from process
  @connector.handle(pipe) do |io|
    begin
      @multiplexer.output(ch, io.readpartial(4096))
    rescue EOFError
      @multiplexer.close(ch)
      @connector.close(io)
    end
  end

  # handle data incoming on the multiplexer
  @connector.handle(@multiplexer.reader(ch)) do |input_io|
    data = input_io.readpartial(4096)
    pipe.write data
  end

  ch
end

#socket(path) ⇒ Object



101
102
103
104
# File 'lib/distributor/server.rb', line 101

def socket(path)
  ch = @multiplexer.reserve
  handle_socket ch, UNIXSocket.new(path)
end

#startObject



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/distributor/server.rb', line 126

def start
  @multiplexer.output 0, Distributor::OkJson.encode({ "command" => "hello" })

  loop do
    begin
      @connector.listen
    rescue Exception => ex
      @multiplexer.output 0, Distributor::OkJson.encode({ "command" => "error", "message" => ex.message, "backtrace" => ex.backtrace.first })
    end
  end
end

#tunnel(port) ⇒ Object



106
107
108
109
# File 'lib/distributor/server.rb', line 106

def tunnel(port)
  ch = @multiplexer.reserve
  handle_socket ch, TCPSocket.new("localhost", port)
end