Class: Flipper::Cloud::Telemetry

Inherits:
Object
  • Object
show all
Defined in:
lib/flipper/cloud/telemetry.rb,
lib/flipper/cloud/telemetry/metric.rb,
lib/flipper/cloud/telemetry/submitter.rb,
lib/flipper/cloud/telemetry/instrumenter.rb,
lib/flipper/cloud/telemetry/backoff_policy.rb,
lib/flipper/cloud/telemetry/metric_storage.rb

Defined Under Namespace

Classes: BackoffPolicy, Instrumenter, Metric, MetricStorage, Submitter

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cloud_configuration) ⇒ Telemetry

Returns a new instance of Telemetry.



52
53
54
55
56
57
58
59
60
# File 'lib/flipper/cloud/telemetry.rb', line 52

def initialize(cloud_configuration)
  @pid = $$
  @cloud_configuration = cloud_configuration
  self.interval = ENV.fetch("FLIPPER_TELEMETRY_INTERVAL", 60).to_f
  self.shutdown_timeout = ENV.fetch("FLIPPER_TELEMETRY_SHUTDOWN_TIMEOUT", 5).to_f
  self.submitter = ->(drained) { Submitter.new(@cloud_configuration).call(drained) }
  start
  at_exit { stop }
end

Instance Attribute Details

#cloud_configurationObject (readonly)

Public: The cloud configuration to use for this telemetry instance.



32
33
34
# File 'lib/flipper/cloud/telemetry.rb', line 32

def cloud_configuration
  @cloud_configuration
end

#intervalObject

Internal: The interval in seconds for how often telemetry should be sent to cloud.



44
45
46
# File 'lib/flipper/cloud/telemetry.rb', line 44

def interval
  @interval
end

#metric_storageObject (readonly)

Internal: Where the metrics are stored between cloud submissions.



35
36
37
# File 'lib/flipper/cloud/telemetry.rb', line 35

def metric_storage
  @metric_storage
end

#poolObject (readonly)

Internal: The pool of background threads that submits metrics to cloud.



38
39
40
# File 'lib/flipper/cloud/telemetry.rb', line 38

def pool
  @pool
end

#shutdown_timeoutObject

Internal: The timeout in seconds for how long to wait for the pool to shutdown.



47
48
49
# File 'lib/flipper/cloud/telemetry.rb', line 47

def shutdown_timeout
  @shutdown_timeout
end

#submitterObject

Internal: The proc that is called to submit metrics to cloud.



50
51
52
# File 'lib/flipper/cloud/telemetry.rb', line 50

def submitter
  @submitter
end

#timerObject (readonly)

Internal: The timer that triggers draining the metrics to the pool.



41
42
43
# File 'lib/flipper/cloud/telemetry.rb', line 41

def timer
  @timer
end

Class Method Details

.instance_for(cloud_configuration) ⇒ Object

Internal: Fetch an instance of telemetry once per process per url + token (aka cloud endpoint). Should only ever be one instance unless you are doing some funky stuff.



25
26
27
28
29
# File 'lib/flipper/cloud/telemetry.rb', line 25

def self.instance_for(cloud_configuration)
  instances.compute_if_absent(cloud_configuration.url + cloud_configuration.token) do
    new(cloud_configuration)
  end
end

.resetObject



18
19
20
# File 'lib/flipper/cloud/telemetry.rb', line 18

def self.reset
  instances.each { |_, instance| instance.stop }.clear
end

Instance Method Details

#record(name, payload) ⇒ Object

Public: Records telemetry events based on active support notifications.



63
64
65
66
67
68
69
70
# File 'lib/flipper/cloud/telemetry.rb', line 63

def record(name, payload)
  return unless name == Flipper::Feature::InstrumentationName
  return unless payload[:operation] == :enabled?
  detect_forking

  metric = Metric.new(payload[:feature_name].to_s.freeze, payload[:result])
  @metric_storage.increment metric
end

#restartObject

Public: Restart all the tasks and reset the storage.



115
116
117
118
# File 'lib/flipper/cloud/telemetry.rb', line 115

def restart
  stop
  start
end

#startObject

Public: Start all the tasks and setup new metric storage.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/flipper/cloud/telemetry.rb', line 73

def start
  info "action=start"

  @metric_storage = MetricStorage.new

  @pool = Concurrent::FixedThreadPool.new(2, {
    max_queue: 5,
    fallback_policy: :discard,
    name: "flipper-telemetry-post-to-cloud-pool".freeze,
  })

  @timer = Concurrent::TimerTask.execute({
    execution_interval: interval,
    name: "flipper-telemetry-post-to-pool-timer".freeze,
  }) { post_to_pool }
end

#stopObject

Public: Shuts down all the tasks and tries to flush any remaining info to Cloud.



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/flipper/cloud/telemetry.rb', line 91

def stop
  info "action=stop"

  if @timer
    debug "action=timer_shutdown_start"
    @timer.shutdown
    # no need to wait long for timer, all it does is drain in memory metric
    # storage and post to the pool of background workers
    timer_termination_result = @timer.wait_for_termination(1)
    @timer.kill unless timer_termination_result
    debug "action=timer_shutdown_end result=#{timer_termination_result}"
  end

  if @pool
    post_to_pool # one last drain
    debug "action=pool_shutdown_start"
    @pool.shutdown
    pool_termination_result = @pool.wait_for_termination(@shutdown_timeout)
    @pool.kill unless pool_termination_result
    debug "action=pool_shutdown_end result=#{pool_termination_result}"
  end
end