Class: DigitalFabric::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/tipi/digital_fabric/service.rb

Constant Summary collapse

INVALID_HOST =
'INVALID_HOST'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(token:) ⇒ Service

Returns a new instance of Service.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/tipi/digital_fabric/service.rb', line 12

def initialize(token: )
  @token = token
  @agents = {}
  @routes = {}
  @counters = {
    connections: 0,
    http_requests: 0,
    errors: 0
  }
  @connection_count = 0
  @current_request_count = 0
  @http_latency_accumulator = 0
  @http_latency_counter = 0
  @http_latency_max = 0
  @last_counters = @counters.merge(stamp: Time.now.to_f - 1)
  @fiber = Fiber.current
  # @timer = Polyphony::Timer.new('service_timer', resolution: 5)
end

Instance Attribute Details

#statsObject (readonly)

Returns the value of attribute stats.



106
107
108
# File 'lib/tipi/digital_fabric/service.rb', line 106

def stats
  @stats
end

#timerObject (readonly)

Returns the value of attribute timer.



10
11
12
# File 'lib/tipi/digital_fabric/service.rb', line 10

def timer
  @timer
end

#tokenObject (readonly)

Returns the value of attribute token.



9
10
11
# File 'lib/tipi/digital_fabric/service.rb', line 9

def token
  @token
end

Instance Method Details

#calculate_statsObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/tipi/digital_fabric/service.rb', line 31

def calculate_stats
  now = Time.now.to_f
  elapsed = now - @last_counters[:stamp]
  connections = @counters[:connections] - @last_counters[:connections]
  http_requests = @counters[:http_requests] - @last_counters[:http_requests]
  errors = @counters[:errors] - @last_counters[:errors]
  @last_counters = @counters.merge(stamp: now)

  average_latency = @http_latency_counter == 0 ? 0 :
                    @http_latency_accumulator / @http_latency_counter
  @http_latency_accumulator = 0
  @http_latency_counter = 0
  max_latency = @http_latency_max
  @http_latency_max = 0

  cpu, rss = pid_cpu_and_rss(Process.pid)

  backend_stats = Thread.backend.stats
  op_rate = backend_stats[:op_count] / elapsed
  switch_rate = backend_stats[:switch_count] / elapsed
  poll_rate = backend_stats[:poll_count] / elapsed

  object_space_stats = ObjectSpace.count_objects

  {
    service: {
      agent_count: @agents.size,
      connection_count: @connection_count,
      connection_rate: connections / elapsed,
      error_rate: errors / elapsed,
      http_request_rate: http_requests / elapsed,
      latency_avg: average_latency,
      latency_max: max_latency,
      pending_requests: @current_request_count,
      },
    backend: {
      op_rate: op_rate,
      pending_ops: backend_stats[:pending_ops],
      poll_rate: poll_rate,
      runqueue_size: backend_stats[:runqueue_size],
      runqueue_high_watermark: backend_stats[:runqueue_max_length],
      switch_rate: switch_rate,

    },
    process: {
      cpu_usage: cpu,
      rss: rss.to_f / 1024,
      objects_total: object_space_stats[:TOTAL],
      objects_free:  object_space_stats[:FREE]
    }
  }
end

#compile_agent_routesObject



229
230
231
232
233
234
235
236
237
238
# File 'lib/tipi/digital_fabric/service.rb', line 229

def compile_agent_routes
  @routing_changed = false

  @routes.clear
  @agents.keys.reverse.each do |agent|
    route = @agents[agent]
    @routes[route] ||= agent
  end
  @route_keys = @routes.keys
end

#decr_connection_countObject



102
103
104
# File 'lib/tipi/digital_fabric/service.rb', line 102

def decr_connection_count
  @connection_count -= 1
end

#df_upgrade(req) ⇒ Object



182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/tipi/digital_fabric/service.rb', line 182

def df_upgrade(req)
  # we don't want to count connected agents
  @current_request_count -= 1
  if req.headers['df-token'] != @token
    return req.respond(nil, ':status' => Qeweney::Status::FORBIDDEN)
  end

  req.adapter.conn << Protocol.df_upgrade_response
  AgentProxy.new(self, req)
ensure
  @current_request_count += 1
end

#find_agent(req) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/tipi/digital_fabric/service.rb', line 215

def find_agent(req)
  compile_agent_routes if @routing_changed

  host = req.headers[':authority'] || req.headers['host'] || INVALID_HOST
  path = req.headers[':path']

  route = @route_keys.find do |route|
    (host == route[:host]) || (path =~ route[:path_regexp])
  end
  return @routes[route] if route

  nil
end

#get_statsObject



94
95
96
# File 'lib/tipi/digital_fabric/service.rb', line 94

def get_stats
  calculate_stats
end

#graceful_shutdownObject



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/tipi/digital_fabric/service.rb', line 244

def graceful_shutdown
  @shutdown = true
  @agents.keys.each do |agent|
    if agent.respond_to?(:send_shutdown)
      agent.send_shutdown
    else
      @agents.delete(agent)
    end
  end
  move_on_after(60) do
    while !@agents.empty?
      sleep 0.25
    end
  end
end

#http_request(req, allow_df_upgrade = false) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/tipi/digital_fabric/service.rb', line 127

def http_request(req, allow_df_upgrade = false)
  @current_request_count += 1
  @counters[:http_requests] += 1
  @counters[:connections] += 1 if req.headers[':first']

  return upgrade_request(req, allow_df_upgrade) if req.upgrade_protocol

  inject_request_headers(req)
  agent = find_agent(req)
  unless agent
    @counters[:errors] += 1
    return req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE)
  end

  agent.http_request(req)
rescue IOError, SystemCallError, HTTP2::Error::StreamClosed
  @counters[:errors] += 1
rescue => e
  @counters[:errors] += 1
  puts '*' * 40
  p req
  p e
  puts e.backtrace.join("\n")
  req.respond(e.inspect, ':status' => Qeweney::Status::INTERNAL_SERVER_ERROR)
ensure
  @current_request_count -= 1
  req.adapter.conn.close if @shutdown
end

#incr_connection_countObject



98
99
100
# File 'lib/tipi/digital_fabric/service.rb', line 98

def incr_connection_count
  @connection_count += 1
end

#inject_request_headers(req) ⇒ Object



156
157
158
159
160
161
# File 'lib/tipi/digital_fabric/service.rb', line 156

def inject_request_headers(req)
  req.headers['x-request-id'] = SecureRandom.uuid
  conn = req.adapter.conn
  req.headers['x-forwarded-for'] = conn.peeraddr(false)[2]
  req.headers['x-forwarded-proto'] ||= conn.is_a?(OpenSSL::SSL::SSLSocket) ? 'https' : 'http'
end

#mount(route, agent) ⇒ Object



195
196
197
198
199
200
201
202
# File 'lib/tipi/digital_fabric/service.rb', line 195

def mount(route, agent)
  if route[:path]
    route[:path_regexp] = path_regexp(route[:path])
  end
  @executive = agent if route[:executive]
  @agents[agent] = route
  @routing_changed = true
end

#path_regexp(path) ⇒ Object



240
241
242
# File 'lib/tipi/digital_fabric/service.rb', line 240

def path_regexp(path)
  /^#{path}/
end

#pid_cpu_and_rss(pid) ⇒ Object



84
85
86
87
88
89
90
91
92
# File 'lib/tipi/digital_fabric/service.rb', line 84

def pid_cpu_and_rss(pid)
  s = `ps -p #{pid} -o %cpu,rss`
  cpu, rss = s.lines[1].chomp.strip.split(' ')
  [cpu.to_f, rss.to_i]
rescue Polyphony::BaseException
  raise
rescue Exception
  [nil, nil]
end

#record_latency_measurement(latency, req) ⇒ Object



118
119
120
121
122
123
124
125
# File 'lib/tipi/digital_fabric/service.rb', line 118

def record_latency_measurement(latency, req)
  @http_latency_accumulator += latency
  @http_latency_counter += 1
  @http_latency_max = latency if latency > @http_latency_max
  return if latency < 1.0

  puts format('slow request (%.1f): %p', latency, req.headers)
end

#total_request_countObject



108
109
110
111
112
113
114
115
116
# File 'lib/tipi/digital_fabric/service.rb', line 108

def total_request_count
  count = 0
  @agents.keys.each do |agent|
    if agent.respond_to?(:current_request_count)
      count += agent.current_request_count
    end
  end
  count
end

#unmount(agent) ⇒ Object



204
205
206
207
208
209
210
211
# File 'lib/tipi/digital_fabric/service.rb', line 204

def unmount(agent)
  route = @agents[agent]
  return unless route

  @executive = nil if route[:executive]
  @agents.delete(agent)
  @routing_changed = true
end

#upgrade_request(req, allow_df_upgrade) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/tipi/digital_fabric/service.rb', line 163

def upgrade_request(req, allow_df_upgrade)
  case (protocol = req.upgrade_protocol)
  when 'df'
    if allow_df_upgrade
      df_upgrade(req)
    else
      req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE)
    end
  else
    agent = find_agent(req)
    unless agent
      @counters[:errors] += 1
      return req.respond(nil, ':status' => Qeweney::Status::SERVICE_UNAVAILABLE)
    end

    agent.http_upgrade(req, protocol)
  end
end