Class: DigitalFabric::Agent
- Inherits:
-
Object
- Object
- DigitalFabric::Agent
show all
- Defined in:
- lib/tipi/digital_fabric/agent.rb
Defined Under Namespace
Classes: GracefulShutdown, TimeoutError
Constant Summary
collapse
- UPGRADE_REQUEST =
<<~HTTP
GET / HTTP/1.1
Host: localhost
Connection: upgrade
Upgrade: df
DF-Token: %s
DF-Mount: %s
HTTP
- @@id =
0
Instance Method Summary
collapse
Constructor Details
#initialize(server_url, route, token) ⇒ Agent
Returns a new instance of Agent.
12
13
14
15
16
17
18
19
|
# File 'lib/tipi/digital_fabric/agent.rb', line 12
def initialize(server_url, route, token)
@server_url = server_url
@route = route
@token = token
@requests = {}
@long_running_requests = {}
@name = '<unknown>'
end
|
Instance Method Details
#connect_and_process_incoming_requests ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/tipi/digital_fabric/agent.rb', line 41
def connect_and_process_incoming_requests
@socket = connect_to_server
@last_recv = @last_send = Time.now
df_upgrade
@connected = true
@msgpack_reader = MessagePack::Unpacker.new
process_incoming_requests
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE, TimeoutError
log 'Disconnected' if @connected
@connected = nil
end
|
#connect_to_server ⇒ Object
56
57
58
59
60
61
62
63
64
|
# File 'lib/tipi/digital_fabric/agent.rb', line 56
def connect_to_server
if @server_url =~ /^([^\:]+)\:(\d+)$/
host = Regexp.last_match(1)
port = Regexp.last_match(2)
Polyphony::Net.tcp_connect(host, port)
else
UNIXSocket.new(@server_url)
end
end
|
#df_upgrade ⇒ Object
76
77
78
79
80
81
82
|
# File 'lib/tipi/digital_fabric/agent.rb', line 76
def df_upgrade
@socket << format(UPGRADE_REQUEST, @token, mount_point)
while (line = @socket.gets)
break if line.chomp.empty?
end
end
|
#get_http_request_body(id, limit) ⇒ Object
201
202
203
204
|
# File 'lib/tipi/digital_fabric/agent.rb', line 201
def get_http_request_body(id, limit)
send_df_message(Protocol.http_get_request_body(id, limit))
receive
end
|
#http_request(req) ⇒ Object
default handler for HTTP request
221
222
223
|
# File 'lib/tipi/digital_fabric/agent.rb', line 221
def http_request(req)
req.respond(nil, { ':status': Qeweney::Status::SERVICE_UNAVAILABLE })
end
|
#is_long_running_request_response?(msg) ⇒ Boolean
#keep_alive ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/tipi/digital_fabric/agent.rb', line 107
def keep_alive
return unless @connected
now = Time.now
if now - @last_send >= Protocol::SEND_TIMEOUT
send_df_message(Protocol.ping)
end
rescue IOError, SystemCallError => e
@fiber.raise(e)
end
|
#log(msg) ⇒ Object
94
95
96
|
# File 'lib/tipi/digital_fabric/agent.rb', line 94
def log(msg)
puts "#{Time.now} (#{@name}) #{msg}"
end
|
#mount_point ⇒ Object
84
85
86
87
88
89
90
91
92
|
# File 'lib/tipi/digital_fabric/agent.rb', line 84
def mount_point
if @route[:host]
"host=#{@route[:host]}"
elsif @route[:path]
"path=#{@route[:path]}"
else
nil
end
end
|
#prepare_http_request(msg) ⇒ Object
#process_incoming_requests ⇒ Object
98
99
100
101
102
103
104
105
|
# File 'lib/tipi/digital_fabric/agent.rb', line 98
def process_incoming_requests
@socket.feed_loop(@msgpack_reader, :feed_each) do |msg|
recv_df_message(msg)
return if @shutdown && @requests.empty?
end
rescue IOError, SystemCallError, TimeoutError
end
|
#recv_df_message(msg) ⇒ Object
#recv_http_request(msg) ⇒ Object
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
|
# File 'lib/tipi/digital_fabric/agent.rb', line 168
def recv_http_request(msg)
req = prepare_http_request(msg)
id = msg[Protocol::Attribute::ID]
@requests[id] = spin("#{Fiber.current.tag}.#{id}") do
http_request(req)
rescue IOError, Errno::ECONNREFUSED, Errno::EPIPE
rescue Polyphony::Terminate => e
req.respond(nil, { ':status' => Qeweney::Status::SERVICE_UNAVAILABLE }) if Fiber.current.graceful_shutdown?
raise e
ensure
@requests.delete(id)
@long_running_requests.delete(id)
@fiber.terminate if @shutdown && @requests.empty?
end
end
|
#recv_http_request_body(msg) ⇒ Object
#recv_shutdown ⇒ Object
161
162
163
164
165
166
|
# File 'lib/tipi/digital_fabric/agent.rb', line 161
def recv_shutdown
@shutdown = true
@long_running_requests.values.each { |f| f.terminate(true) }
end
|
#recv_ws_request(msg) ⇒ Object
206
207
208
209
210
211
212
213
214
215
216
217
218
|
# File 'lib/tipi/digital_fabric/agent.rb', line 206
def recv_ws_request(msg)
req = Qeweney::Request.new(msg[Protocol::Attribute::WS::HEADERS], RequestAdapter.new(self, msg))
id = msg[Protocol::Attribute::ID]
@requests[id] = @long_running_requests[id] = spin("#{Fiber.current.tag}.#{id}-ws") do
ws_request(req)
rescue IOError, Errno::ECONNREFUSED, Errno::EPIPE
ensure
@requests.delete(id)
@long_running_requests.delete(id)
@fiber.terminate if @shutdown && @requests.empty?
end
end
|
#run ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/tipi/digital_fabric/agent.rb', line 29
def run
@fiber = Fiber.current
@keep_alive_timer = spin_loop("#{@fiber.tag}-keep_alive", interval: 5) { keep_alive }
while true
connect_and_process_incoming_requests
return if @shutdown
sleep 5
end
ensure
@keep_alive_timer.stop
end
|
#send_df_message(msg) ⇒ Object
140
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/tipi/digital_fabric/agent.rb', line 140
def send_df_message(msg)
if is_long_running_request_response?(msg)
id = msg[Protocol::Attribute::ID]
@long_running_requests[id] = @requests[id]
end
@last_send = Time.now
@socket << msg.to_msgpack
end
|
#ws_request(req) ⇒ Object
default handler for WS request
226
227
228
|
# File 'lib/tipi/digital_fabric/agent.rb', line 226
def ws_request(req)
req.respond(nil, { ':status': Qeweney::Status::SERVICE_UNAVAILABLE })
end
|