Class: DigitalFabric::AgentProxy

Inherits:
Object
  • Object
show all
Defined in:
lib/tipi/digital_fabric/agent_proxy.rb

Defined Under Namespace

Classes: GracefulShutdown, TimeoutError

Constant Summary collapse

HTTP_RESPONSE_UPGRADE_HEADERS =
{ ':status' => Qeweney::Status::SWITCHING_PROTOCOLS }

Instance Method Summary collapse

Constructor Details

#initialize(service, req) ⇒ AgentProxy

Returns a new instance of AgentProxy.



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 9

def initialize(service, req)
  @service = service
  @req = req
  @conn = req.adapter.conn
  @msgpack_reader = MessagePack::Unpacker.new
  @requests = {}
  @current_request_count = 0
  @last_request_id = 0
  @last_recv = @last_send = Time.now
  run
end

Instance Method Details

#current_request_countObject



21
22
23
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 21

def current_request_count
  @current_request_count
end

#handle_stats_request(id) ⇒ Object



199
200
201
202
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 199

def handle_stats_request(id)
  stats = @service.get_stats
  send_df_message(Protocol.stats_response(id, stats))
end

#handle_websocket_upgrade(req) ⇒ Object



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 288

def handle_websocket_upgrade(req)
  with_request do |id|
    send_df_message(Protocol.ws_request(id, req.headers))
    response = receive
    case response[0]
    when Protocol::WS_RESPONSE
      headers = response[2] || {}
      status = headers[':status'] || Qeweney::Status::SWITCHING_PROTOCOLS
      if status != Qeweney::Status::SWITCHING_PROTOCOLS
        req.respond(nil, headers)
        return
      end
      ws = Tipi::Websocket.new(req.adapter.conn, req.headers)
      run_websocket_connection(id, ws)
    else
      req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE)
    end
  end
rescue IOError, SystemCallError
  # ignore
end

#http_custom_upgrade(id, req, headers) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 206

def http_custom_upgrade(id, req, headers)
  # send upgrade response
  upgrade_headers = headers ?
   headers.merge(HTTP_RESPONSE_UPGRADE_HEADERS) :
    HTTP_RESPONSE_UPGRADE_HEADERS
  req.send_headers(upgrade_headers, true)

  conn = req.adapter.conn
  reader = spin("#{Fiber.current.tag}.#{id}") do
    conn.recv_loop do |data|
      send_df_message(Protocol.conn_data(id, data))
    end
  end
  while (message = receive)
    return if http_custom_upgrade_message(conn, message)
  end
ensure
  reader.stop
end

#http_custom_upgrade_message(conn, message) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 226

def http_custom_upgrade_message(conn, message)
  case message[Protocol::Attribute::KIND]
  when Protocol::CONN_DATA
    conn << message[:Protocol::Attribute::ConnData::DATA]
    false
  when Protocol::CONN_CLOSE
    true
  else
    # invalid message
    true
  end
end

#http_get_request_body(id, req, limit) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 261

def http_get_request_body(id, req, limit)
  case limit
  when nil
    body = req.read
  else
    limit = limit.to_i
    body = nil
    req.each_chunk do |chunk|
      (body ||= +'') << chunk
      break if body.bytesize >= limit
    end
  end
  send_df_message(Protocol.http_request_body(id, body, req.complete?))
end

#http_request(req) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 143

def http_request(req)
  t0 = Time.now
  t1 = nil
  with_request do |id|
    msg = Protocol.http_request(id, req.headers, req.next_chunk(true), req.complete?)
    send_df_message(msg)
    while (message = receive)
      kind = message[Protocol::Attribute::KIND]
      unless t1
        t1 = Time.now
        if kind == Protocol::HTTP_RESPONSE
          headers = message[Protocol::Attribute::HttpResponse::HEADERS]
          status = (headers && headers[':status']) || 200
          if status < Qeweney::Status::BAD_REQUEST
            @service.record_latency_measurement(t1 - t0, req)
          end
        end
      end
      attributes = message[Protocol::Attribute::HttpRequest::HEADERS..-1]
      return if http_request_message(id, req, kind, attributes)
    end
  end
rescue => e
  p "Internal server error: #{e.inspect}"
  puts e.backtrace.join("\n")
  http_request_send_error_response(e)
end

#http_request_message(id, req, kind, message) ⇒ Boolean

Returns true if response is complete.

Returns:

  • (Boolean)

    true if response is complete



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 179

def http_request_message(id, req, kind, message)
  case kind
  when Protocol::HTTP_UPGRADE
    http_custom_upgrade(id, req, *message)
    true
  when Protocol::HTTP_GET_REQUEST_BODY
    http_get_request_body(id, req, *message)
    false
  when Protocol::HTTP_RESPONSE
    http_response(id, req, *message)
  else
    # invalid message
    true
  end
end

#http_request_send_error_response(error) ⇒ Object



171
172
173
174
175
176
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 171

def http_request_send_error_response(error)
  response = format("Error: %s\n%s", error.inspect, error.backtrace.join("\n"))
  req.respond(response, ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR)
rescue IOError, SystemCallError
  # ignore
end

#http_response(id, req, body, headers, complete, transfer_count_key) ⇒ Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 239

def http_response(id, req, body, headers, complete, transfer_count_key)
  if !req.headers_sent? && complete
    req.respond(body, headers|| {})
    if transfer_count_key
      rx, tx = req.transfer_counts
      send_transfer_count(transfer_count_key, rx, tx)
    end
    true
  else
    req.send_headers(headers) if headers && !req.headers_sent?
    req.send_chunk(body, done: complete) if body or complete

    if complete && transfer_count_key
      rx, tx = req.transfer_counts
      send_transfer_count(transfer_count_key, rx, tx)
    end
    complete
  end
rescue IOError, SystemCallError
  # ignore error
end

#http_upgrade(req, protocol) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 276

def http_upgrade(req, protocol)
  if protocol == 'websocket'
    handle_websocket_upgrade(req)
  else
    # other protocol upgrades should be handled by the agent, so we just run
    # the request normally. The agent is expected to upgrade the connection
    # using a http_upgrade message. From that moment on, two-way
    # communication is handled using conn_data and conn_close messages.
    http_request(req)
  end
end

#keep_aliveObject



68
69
70
71
72
73
74
75
76
77
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 68

def keep_alive
  now = Time.now
  if now - @last_send >= Protocol::SEND_TIMEOUT
    send_df_message(Protocol.ping)
  end
  # if now - @last_recv >= Protocol::RECV_TIMEOUT
  #   raise TimeoutError
  # end
rescue TimeoutError, IOError
end

#process_incoming_messages(shutdown = false) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 45

def process_incoming_messages(shutdown = false)
  return if shutdown && @requests.empty?

  @conn.feed_loop(@msgpack_reader, :feed_each) do |msg|
    recv_df_message(msg)
    return if shutdown && @requests.empty?
  end
rescue TimeoutError, IOError, SystemCallError
  # ignore and just return in order to terminate the proxy
end

#recv_df_message(message) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 92

def recv_df_message(message)
  @last_recv = Time.now
  # puts "<<< #{message.inspect}"

  case message[Protocol::Attribute::KIND]
  when Protocol::PING
    return
  when Protocol::UNMOUNT
    return unmount
  when Protocol::STATS_REQUEST
    return handle_stats_request(message[Protocol::Attribute::ID])
  end

  handler = @requests[message[Protocol::Attribute::ID]]
  if !handler
    # puts "Unknown request id in #{message}"
    return
  end

  handler << message
end

#register_request_fiberObject

HTTP / WebSocket



123
124
125
126
127
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 123

def register_request_fiber
  id = (@last_request_id += 1)
  @requests[id] = Fiber.current
  id
end

#routeObject



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 79

def route
  case @req.headers['df-mount']
  when /^\s*host\s*=\s*([^\s]+)/
    { host: Regexp.last_match(1) }
  when /^\s*path\s*=\s*([^\s]+)/
    { path: Regexp.last_match(1) }
  when /catch_all/
    { catch_all: true }
  else
    nil
  end
end

#runObject



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 31

def run
  @fiber = Fiber.current
  @service.mount(route, self)
  @mounted = true
  # keep_alive_timer = spin_loop("#{@fiber.tag}-keep_alive", interval: 5) { keep_alive }
  process_incoming_messages(false)
rescue GracefulShutdown
  puts "Proxy got graceful shutdown, left: #{@requests.size} requests" if @requests.size > 0
  move_on_after(15) { process_incoming_messages(true) }
ensure
  # keep_alive_timer&.stop
  unmount
end

#run_websocket_connection(id, websocket) ⇒ Object



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 310

def run_websocket_connection(id, websocket)
  reader = spin("#{Fiber.current}.#{id}-ws") do
    websocket.recv_loop do |data|
      send_df_message(Protocol.ws_data(id, data))
    end
  end
  while (message = receive)
    case message[Protocol::Attribute::KIND]
    when Protocol::WS_DATA
      websocket << message[Protocol::Attribute::WS::DATA]
    when Protocol::WS_CLOSE
      return
    else
      raise "Unexpected websocket message #{message.inspect}"
    end
  end
ensure
  reader.stop
  websocket.close
end

#send_df_message(message) ⇒ Object



114
115
116
117
118
119
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 114

def send_df_message(message)
  # puts ">>> #{message.inspect}" unless message[Protocol::Attribute::KIND] == Protocol::PING

  @last_send = Time.now
  @conn << message.to_msgpack
end

#send_shutdownObject



63
64
65
66
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 63

def send_shutdown
  send_df_message(Protocol.shutdown)
  @fiber.raise GracefulShutdown.new
end

#send_transfer_count(key, rx, tx) ⇒ Object



195
196
197
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 195

def send_transfer_count(key, rx, tx)
  send_df_message(Protocol.transfer_count(key, rx, tx))
end

#unmountObject



56
57
58
59
60
61
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 56

def unmount
  return unless @mounted

  @service.unmount(self)
  @mounted = nil
end

#unregister_request_fiber(id) ⇒ Object



129
130
131
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 129

def unregister_request_fiber(id)
  @requests.delete(id)
end

#with_requestObject



133
134
135
136
137
138
139
140
141
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 133

def with_request
  @current_request_count += 1
  id = (@last_request_id += 1)
  @requests[id] = Fiber.current
  yield id
ensure
  @current_request_count -= 1
  @requests.delete(id)
end