Class: DigitalFabric::AgentProxy
- Inherits:
-
Object
- Object
- DigitalFabric::AgentProxy
- Defined in:
- lib/tipi/digital_fabric/agent_proxy.rb
Defined Under Namespace
Classes: GracefulShutdown, TimeoutError
Constant Summary collapse
- HTTP_RESPONSE_UPGRADE_HEADERS =
{ ':status' => Qeweney::Status::SWITCHING_PROTOCOLS }
Instance Method Summary collapse
- #current_request_count ⇒ Object
- #handle_stats_request(id) ⇒ Object
- #handle_websocket_upgrade(req) ⇒ Object
- #http_custom_upgrade(id, req, headers) ⇒ Object
- #http_custom_upgrade_message(conn, message) ⇒ Object
- #http_get_request_body(id, req, limit) ⇒ Object
- #http_request(req) ⇒ Object
-
#http_request_message(id, req, kind, message) ⇒ Boolean
True if response is complete.
- #http_request_send_error_response(error) ⇒ Object
- #http_response(id, req, body, headers, complete, transfer_count_key) ⇒ Object
- #http_upgrade(req, protocol) ⇒ Object
-
#initialize(service, req) ⇒ AgentProxy
constructor
A new instance of AgentProxy.
- #keep_alive ⇒ Object
- #process_incoming_messages(shutdown = false) ⇒ Object
- #recv_df_message(message) ⇒ Object
-
#register_request_fiber ⇒ Object
HTTP / WebSocket.
- #route ⇒ Object
- #run ⇒ Object
- #run_websocket_connection(id, websocket) ⇒ Object
- #send_df_message(message) ⇒ Object
- #send_shutdown ⇒ Object
- #send_transfer_count(key, rx, tx) ⇒ Object
- #unmount ⇒ Object
- #unregister_request_fiber(id) ⇒ Object
- #with_request ⇒ Object
Constructor Details
#initialize(service, req) ⇒ AgentProxy
Returns a new instance of AgentProxy.
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 9 def initialize(service, req) @service = service @req = req @conn = req.adapter.conn @msgpack_reader = MessagePack::Unpacker.new @requests = {} @current_request_count = 0 @last_request_id = 0 @last_recv = @last_send = Time.now run end |
Instance Method Details
#current_request_count ⇒ Object
21 22 23 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 21 def current_request_count @current_request_count end |
#handle_stats_request(id) ⇒ Object
199 200 201 202 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 199 def handle_stats_request(id) stats = @service.get_stats (Protocol.stats_response(id, stats)) end |
#handle_websocket_upgrade(req) ⇒ Object
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 288 def handle_websocket_upgrade(req) with_request do |id| (Protocol.ws_request(id, req.headers)) response = receive case response[0] when Protocol::WS_RESPONSE headers = response[2] || {} status = headers[':status'] || Qeweney::Status::SWITCHING_PROTOCOLS if status != Qeweney::Status::SWITCHING_PROTOCOLS req.respond(nil, headers) return end ws = Tipi::Websocket.new(req.adapter.conn, req.headers) run_websocket_connection(id, ws) else req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE) end end rescue IOError, SystemCallError # ignore end |
#http_custom_upgrade(id, req, headers) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 206 def http_custom_upgrade(id, req, headers) # send upgrade response upgrade_headers = headers ? headers.merge(HTTP_RESPONSE_UPGRADE_HEADERS) : HTTP_RESPONSE_UPGRADE_HEADERS req.send_headers(upgrade_headers, true) conn = req.adapter.conn reader = spin("#{Fiber.current.tag}.#{id}") do conn.recv_loop do |data| (Protocol.conn_data(id, data)) end end while ( = receive) return if (conn, ) end ensure reader.stop end |
#http_custom_upgrade_message(conn, message) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 226 def (conn, ) case [Protocol::Attribute::KIND] when Protocol::CONN_DATA conn << [:Protocol::Attribute::ConnData::DATA] false when Protocol::CONN_CLOSE true else # invalid message true end end |
#http_get_request_body(id, req, limit) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 261 def http_get_request_body(id, req, limit) case limit when nil body = req.read else limit = limit.to_i body = nil req.each_chunk do |chunk| (body ||= +'') << chunk break if body.bytesize >= limit end end (Protocol.http_request_body(id, body, req.complete?)) end |
#http_request(req) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 143 def http_request(req) t0 = Time.now t1 = nil with_request do |id| msg = Protocol.http_request(id, req.headers, req.next_chunk(true), req.complete?) (msg) while ( = receive) kind = [Protocol::Attribute::KIND] unless t1 t1 = Time.now if kind == Protocol::HTTP_RESPONSE headers = [Protocol::Attribute::HttpResponse::HEADERS] status = (headers && headers[':status']) || 200 if status < Qeweney::Status::BAD_REQUEST @service.record_latency_measurement(t1 - t0, req) end end end attributes = [Protocol::Attribute::HttpRequest::HEADERS..-1] return if (id, req, kind, attributes) end end rescue => e p "Internal server error: #{e.inspect}" puts e.backtrace.join("\n") http_request_send_error_response(e) end |
#http_request_message(id, req, kind, message) ⇒ Boolean
Returns true if response is complete.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 179 def (id, req, kind, ) case kind when Protocol::HTTP_UPGRADE http_custom_upgrade(id, req, *) true when Protocol::HTTP_GET_REQUEST_BODY http_get_request_body(id, req, *) false when Protocol::HTTP_RESPONSE http_response(id, req, *) else # invalid message true end end |
#http_request_send_error_response(error) ⇒ Object
171 172 173 174 175 176 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 171 def http_request_send_error_response(error) response = format("Error: %s\n%s", error.inspect, error.backtrace.join("\n")) req.respond(response, ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR) rescue IOError, SystemCallError # ignore end |
#http_response(id, req, body, headers, complete, transfer_count_key) ⇒ Object
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 239 def http_response(id, req, body, headers, complete, transfer_count_key) if !req.headers_sent? && complete req.respond(body, headers|| {}) if transfer_count_key rx, tx = req.transfer_counts send_transfer_count(transfer_count_key, rx, tx) end true else req.send_headers(headers) if headers && !req.headers_sent? req.send_chunk(body, done: complete) if body or complete if complete && transfer_count_key rx, tx = req.transfer_counts send_transfer_count(transfer_count_key, rx, tx) end complete end rescue IOError, SystemCallError # ignore error end |
#http_upgrade(req, protocol) ⇒ Object
276 277 278 279 280 281 282 283 284 285 286 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 276 def http_upgrade(req, protocol) if protocol == 'websocket' handle_websocket_upgrade(req) else # other protocol upgrades should be handled by the agent, so we just run # the request normally. The agent is expected to upgrade the connection # using a http_upgrade message. From that moment on, two-way # communication is handled using conn_data and conn_close messages. http_request(req) end end |
#keep_alive ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 68 def keep_alive now = Time.now if now - @last_send >= Protocol::SEND_TIMEOUT (Protocol.ping) end # if now - @last_recv >= Protocol::RECV_TIMEOUT # raise TimeoutError # end rescue TimeoutError, IOError end |
#process_incoming_messages(shutdown = false) ⇒ Object
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 45 def (shutdown = false) return if shutdown && @requests.empty? @conn.feed_loop(@msgpack_reader, :feed_each) do |msg| (msg) return if shutdown && @requests.empty? end rescue TimeoutError, IOError, SystemCallError # ignore and just return in order to terminate the proxy end |
#recv_df_message(message) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 92 def () @last_recv = Time.now # puts "<<< #{message.inspect}" case [Protocol::Attribute::KIND] when Protocol::PING return when Protocol::UNMOUNT return unmount when Protocol::STATS_REQUEST return handle_stats_request([Protocol::Attribute::ID]) end handler = @requests[[Protocol::Attribute::ID]] if !handler # puts "Unknown request id in #{message}" return end handler << end |
#register_request_fiber ⇒ Object
HTTP / WebSocket
123 124 125 126 127 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 123 def register_request_fiber id = (@last_request_id += 1) @requests[id] = Fiber.current id end |
#route ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 79 def route case @req.headers['df-mount'] when /^\s*host\s*=\s*([^\s]+)/ { host: Regexp.last_match(1) } when /^\s*path\s*=\s*([^\s]+)/ { path: Regexp.last_match(1) } when /catch_all/ { catch_all: true } else nil end end |
#run ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 31 def run @fiber = Fiber.current @service.mount(route, self) @mounted = true # keep_alive_timer = spin_loop("#{@fiber.tag}-keep_alive", interval: 5) { keep_alive } (false) rescue GracefulShutdown puts "Proxy got graceful shutdown, left: #{@requests.size} requests" if @requests.size > 0 move_on_after(15) { (true) } ensure # keep_alive_timer&.stop unmount end |
#run_websocket_connection(id, websocket) ⇒ Object
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 310 def run_websocket_connection(id, websocket) reader = spin("#{Fiber.current}.#{id}-ws") do websocket.recv_loop do |data| (Protocol.ws_data(id, data)) end end while ( = receive) case [Protocol::Attribute::KIND] when Protocol::WS_DATA websocket << [Protocol::Attribute::WS::DATA] when Protocol::WS_CLOSE return else raise "Unexpected websocket message #{.inspect}" end end ensure reader.stop websocket.close end |
#send_df_message(message) ⇒ Object
114 115 116 117 118 119 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 114 def () # puts ">>> #{message.inspect}" unless message[Protocol::Attribute::KIND] == Protocol::PING @last_send = Time.now @conn << .to_msgpack end |
#send_shutdown ⇒ Object
63 64 65 66 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 63 def send_shutdown (Protocol.shutdown) @fiber.raise GracefulShutdown.new end |
#send_transfer_count(key, rx, tx) ⇒ Object
195 196 197 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 195 def send_transfer_count(key, rx, tx) (Protocol.transfer_count(key, rx, tx)) end |
#unmount ⇒ Object
56 57 58 59 60 61 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 56 def unmount return unless @mounted @service.unmount(self) @mounted = nil end |
#unregister_request_fiber(id) ⇒ Object
129 130 131 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 129 def unregister_request_fiber(id) @requests.delete(id) end |
#with_request ⇒ Object
133 134 135 136 137 138 139 140 141 |
# File 'lib/tipi/digital_fabric/agent_proxy.rb', line 133 def with_request @current_request_count += 1 id = (@last_request_id += 1) @requests[id] = Fiber.current yield id ensure @current_request_count -= 1 @requests.delete(id) end |