Class: DigitalFabric::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/tipi/digital_fabric/agent.rb

Defined Under Namespace

Classes: GracefulShutdown, TimeoutError

Constant Summary collapse

UPGRADE_REQUEST =
<<~HTTP
GET / HTTP/1.1
Host: localhost
Connection: upgrade
Upgrade: df
DF-Token: %s
DF-Mount: %s

HTTP
@@id =
0

Instance Method Summary collapse

Constructor Details

#initialize(server_url, route, token) ⇒ Agent

Returns a new instance of Agent.



12
13
14
15
16
17
18
19
# File 'lib/tipi/digital_fabric/agent.rb', line 12

def initialize(server_url, route, token)
  @server_url = server_url
  @route = route
  @token = token
  @requests = {}
  @long_running_requests = {}
  @name = '<unknown>'
end

Instance Method Details

#connect_and_process_incoming_requestsObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/tipi/digital_fabric/agent.rb', line 41

def connect_and_process_incoming_requests
  # log 'Connecting...'
  @socket = connect_to_server
  @last_recv = @last_send = Time.now

  df_upgrade
  @connected = true
  @msgpack_reader = MessagePack::Unpacker.new

  process_incoming_requests
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, TimeoutError
  log 'Disconnected' if @connected
  @connected = nil
end

#connect_to_serverObject



56
57
58
59
60
61
62
63
64
# File 'lib/tipi/digital_fabric/agent.rb', line 56

def connect_to_server
  if @server_url =~ /^([^\:]+)\:(\d+)$/
    host = Regexp.last_match(1)
    port = Regexp.last_match(2)
    Polyphony::Net.tcp_connect(host, port)
  else
    UNIXSocket.new(@server_url)
  end
end

#df_upgradeObject



76
77
78
79
80
81
82
# File 'lib/tipi/digital_fabric/agent.rb', line 76

def df_upgrade
  @socket << format(UPGRADE_REQUEST, @token, mount_point)
  while (line = @socket.gets)
    break if line.chomp.empty?
  end
  # log 'Connection upgraded'
end

#get_http_request_body(id, limit) ⇒ Object



201
202
203
204
# File 'lib/tipi/digital_fabric/agent.rb', line 201

def get_http_request_body(id, limit)
  send_df_message(Protocol.http_get_request_body(id, limit))
  receive
end

#http_request(req) ⇒ Object

default handler for HTTP request



221
222
223
# File 'lib/tipi/digital_fabric/agent.rb', line 221

def http_request(req)
  req.respond(nil, { ':status': Qeweney::Status::SERVICE_UNAVAILABLE })
end

#is_long_running_request_response?(msg) ⇒ Boolean

Returns:

  • (Boolean)


152
153
154
155
156
157
158
159
# File 'lib/tipi/digital_fabric/agent.rb', line 152

def is_long_running_request_response?(msg)
  case msg[Protocol::Attribute::KIND]
  when Protocol::HTTP_UPGRADE
    true
  when Protocol::HTTP_RESPONSE
    !msg[Protocol::Attribute::HttpResponse::COMPLETE]
  end
end

#keep_aliveObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/tipi/digital_fabric/agent.rb', line 107

def keep_alive
  return unless @connected

  now = Time.now
  if now - @last_send >= Protocol::SEND_TIMEOUT
    send_df_message(Protocol.ping)
  end
  # if now - @last_recv >= Protocol::RECV_TIMEOUT
  #   raise TimeoutError
  # end
rescue IOError, SystemCallError => e
  # transmit exception to fiber running the agent
  @fiber.raise(e)
end

#log(msg) ⇒ Object



94
95
96
# File 'lib/tipi/digital_fabric/agent.rb', line 94

def log(msg)
  puts "#{Time.now} (#{@name}) #{msg}"
end

#mount_pointObject



84
85
86
87
88
89
90
91
92
# File 'lib/tipi/digital_fabric/agent.rb', line 84

def mount_point
  if @route[:host]
    "host=#{@route[:host]}"
  elsif @route[:path]
    "path=#{@route[:path]}"
  else
    nil
  end
end

#prepare_http_request(msg) ⇒ Object



185
186
187
188
189
190
191
192
# File 'lib/tipi/digital_fabric/agent.rb', line 185

def prepare_http_request(msg)
  headers = msg[Protocol::Attribute::HttpRequest::HEADERS]
  body_chunk = msg[Protocol::Attribute::HttpRequest::BODY_CHUNK]
  complete = msg[Protocol::Attribute::HttpRequest::COMPLETE]
  req = Qeweney::Request.new(headers, RequestAdapter.new(self, msg))
  req.buffer_body_chunk(body_chunk) if body_chunk
  req
end

#process_incoming_requestsObject



98
99
100
101
102
103
104
105
# File 'lib/tipi/digital_fabric/agent.rb', line 98

def process_incoming_requests
  @socket.feed_loop(@msgpack_reader, :feed_each) do |msg|
    recv_df_message(msg)
    return if @shutdown && @requests.empty?
  end
rescue IOError, SystemCallError, TimeoutError
  # ignore
end

#recv_df_message(msg) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/tipi/digital_fabric/agent.rb', line 122

def recv_df_message(msg)
  @last_recv = Time.now
  case msg[Protocol::Attribute::KIND]
  when Protocol::SHUTDOWN
    recv_shutdown
  when Protocol::HTTP_REQUEST
    recv_http_request(msg)
  when Protocol::HTTP_REQUEST_BODY
    recv_http_request_body(msg)
  when Protocol::WS_REQUEST
    recv_ws_request(msg)
  when Protocol::CONN_DATA, Protocol::CONN_CLOSE,
       Protocol::WS_DATA, Protocol::WS_CLOSE
    fiber = @requests[msg[Protocol::Attribute::ID]]
    fiber << msg if fiber
  end
end

#recv_http_request(msg) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/tipi/digital_fabric/agent.rb', line 168

def recv_http_request(msg)
  req = prepare_http_request(msg)
  id = msg[Protocol::Attribute::ID]
  @requests[id] = spin("#{Fiber.current.tag}.#{id}") do
    http_request(req)
  rescue IOError, Errno::ECONNREFUSED, Errno::EPIPE
    # ignore
  rescue Polyphony::Terminate => e
    req.respond(nil, { ':status' => Qeweney::Status::SERVICE_UNAVAILABLE }) if Fiber.current.graceful_shutdown?
    raise e
  ensure
    @requests.delete(id)
    @long_running_requests.delete(id)
    @fiber.terminate if @shutdown && @requests.empty?
  end
end

#recv_http_request_body(msg) ⇒ Object



194
195
196
197
198
199
# File 'lib/tipi/digital_fabric/agent.rb', line 194

def recv_http_request_body(msg)
  fiber = @requests[msg[Protocol::Attribute::ID]]
  return unless fiber

  fiber << msg[Protocol::Attribute::HttpRequestBody::BODY]
end

#recv_shutdownObject



161
162
163
164
165
166
# File 'lib/tipi/digital_fabric/agent.rb', line 161

def recv_shutdown
  # puts "Received shutdown message (#{@requests.size} pending requests)"
  # puts "  (Long running requests: #{@long_running_requests.size})"
  @shutdown = true
  @long_running_requests.values.each { |f| f.terminate(true) }
end

#recv_ws_request(msg) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/tipi/digital_fabric/agent.rb', line 206

def recv_ws_request(msg)
  req = Qeweney::Request.new(msg[Protocol::Attribute::WS::HEADERS], RequestAdapter.new(self, msg))
  id = msg[Protocol::Attribute::ID]
  @requests[id] = @long_running_requests[id] = spin("#{Fiber.current.tag}.#{id}-ws") do
    ws_request(req)
  rescue IOError, Errno::ECONNREFUSED, Errno::EPIPE
    # ignore
  ensure
    @requests.delete(id)
    @long_running_requests.delete(id)
    @fiber.terminate if @shutdown && @requests.empty?
  end
end

#runObject



29
30
31
32
33
34
35
36
37
38
39
# File 'lib/tipi/digital_fabric/agent.rb', line 29

def run
  @fiber = Fiber.current
  @keep_alive_timer = spin_loop("#{@fiber.tag}-keep_alive", interval: 5) { keep_alive }
  while true
    connect_and_process_incoming_requests
    return if @shutdown
    sleep 5
  end
ensure
  @keep_alive_timer.stop
end

#send_df_message(msg) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/tipi/digital_fabric/agent.rb', line 140

def send_df_message(msg)
  # we mark long-running requests by applying simple heuristics to sent DF
  # messages. This is so we can correctly stop long-running requests
  # upon graceful shutdown
  if is_long_running_request_response?(msg)
    id = msg[Protocol::Attribute::ID]
    @long_running_requests[id] = @requests[id]
  end
  @last_send = Time.now
  @socket << msg.to_msgpack
end

#ws_request(req) ⇒ Object

default handler for WS request



226
227
228
# File 'lib/tipi/digital_fabric/agent.rb', line 226

def ws_request(req)
  req.respond(nil, { ':status': Qeweney::Status::SERVICE_UNAVAILABLE })
end