Class: Sentry::Metrics::Aggregator

Inherits:
ThreadedPeriodicWorker show all
Defined in:
lib/sentry/metrics/aggregator.rb

Constant Summary collapse

FLUSH_INTERVAL =
5
ROLLUP_IN_SECONDS =
10
DEFAULT_STACKLEVEL =

this is how far removed from user code in the backtrace we are when we record code locations

4
KEY_SANITIZATION_REGEX =
/[^a-zA-Z0-9_\-.]+/
UNIT_SANITIZATION_REGEX =
/[^a-zA-Z0-9_]+/
TAG_KEY_SANITIZATION_REGEX =
/[^a-zA-Z0-9_\-.\/]+/
TAG_VALUE_SANITIZATION_MAP =
{
  "\n" => "\\n",
  "\r" => "\\r",
  "\t" => "\\t",
  "\\" => "\\\\",
  "|" => "\\u{7c}",
  "," => "\\u{2c}"
}
METRIC_TYPES =
{
  c: CounterMetric,
  d: DistributionMetric,
  g: GaugeMetric,
  s: SetMetric
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ThreadedPeriodicWorker

#ensure_thread, #kill

Constructor Details

#initialize(configuration, client) ⇒ Aggregator

Returns a new instance of Aggregator.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/sentry/metrics/aggregator.rb', line 36

def initialize(configuration, client)
  super(configuration.logger, FLUSH_INTERVAL)
  @client = client
  @before_emit = configuration.metrics.before_emit
  @enable_code_locations = configuration.metrics.enable_code_locations
  @stacktrace_builder = configuration.stacktrace_builder

  @default_tags = {}
  @default_tags["release"] = configuration.release if configuration.release
  @default_tags["environment"] = configuration.environment if configuration.environment

  @mutex = Mutex.new

  # a nested hash of timestamp -> bucket keys -> Metric instance
  @buckets = {}

  # the flush interval needs to be shifted once per startup to create jittering
  @flush_shift = Random.rand * ROLLUP_IN_SECONDS

  # a nested hash of timestamp (start of day) -> meta keys -> frame
  @code_locations = {}
end

Instance Attribute Details

#bucketsObject (readonly)

exposed only for testing



34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def buckets
  @buckets
end

#clientObject (readonly)

exposed only for testing



34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def client
  @client
end

#code_locationsObject (readonly)

exposed only for testing



34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def code_locations
  @code_locations
end

#flush_shiftObject (readonly)

exposed only for testing



34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def flush_shift
  @flush_shift
end

#threadObject (readonly)

exposed only for testing



34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def thread
  @thread
end

Instance Method Details

#add(type, key, value, unit: "none", tags: {}, timestamp: nil, stacklevel: nil) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sentry/metrics/aggregator.rb', line 59

def add(type,
        key,
        value,
        unit: "none",
        tags: {},
        timestamp: nil,
        stacklevel: nil)
  return unless ensure_thread
  return unless METRIC_TYPES.keys.include?(type)

  updated_tags = get_updated_tags(tags)
  return if @before_emit && !@before_emit.call(key, updated_tags)

  timestamp ||= Sentry.utc_now

  # this is integer division and thus takes the floor of the division
  # and buckets into 10 second intervals
  bucket_timestamp = (timestamp.to_i / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS

  serialized_tags = serialize_tags(updated_tags)
  bucket_key = [type, key, unit, serialized_tags]

  added = @mutex.synchronize do
    record_code_location(type, key, unit, timestamp, stacklevel: stacklevel) if @enable_code_locations
    process_bucket(bucket_timestamp, bucket_key, type, value)
  end

  # for sets, we pass on if there was a new entry to the local gauge
  local_value = type == :s ? added : value
  process_span_aggregator(bucket_key, local_value)
end

#flush(force: false) ⇒ Object Also known as: run



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/sentry/metrics/aggregator.rb', line 91

def flush(force: false)
  flushable_buckets = get_flushable_buckets!(force)
  code_locations = get_code_locations!
  return if flushable_buckets.empty? && code_locations.empty?

  envelope = Envelope.new

  unless flushable_buckets.empty?
    payload = serialize_buckets(flushable_buckets)
    envelope.add_item(
      { type: "statsd", length: payload.bytesize },
      payload
    )
  end

  unless code_locations.empty?
    code_locations.each do |timestamp, locations|
      payload = serialize_locations(timestamp, locations)
      envelope.add_item(
        { type: "metric_meta", content_type: "application/json" },
        payload
      )
    end
  end

  @client.capture_envelope(envelope)
end