Class: Sentry::Metrics::Aggregator

Inherits:
ThreadedPeriodicWorker show all
Defined in:
lib/sentry/metrics/aggregator.rb

Constant Summary collapse

FLUSH_INTERVAL =
5
ROLLUP_IN_SECONDS =
10
DEFAULT_STACKLEVEL =

this is how far removed from user code in the backtrace we are when we record code locations

4
KEY_SANITIZATION_REGEX =
/[^a-zA-Z0-9_\-.]+/
UNIT_SANITIZATION_REGEX =
/[^a-zA-Z0-9_]+/
TAG_KEY_SANITIZATION_REGEX =
/[^a-zA-Z0-9_\-.\/]+/
TAG_VALUE_SANITIZATION_MAP =
{
  "\n" => "\\n",
  "\r" => "\\r",
  "\t" => "\\t",
  "\\" => "\\\\",
  "|" => "\\u{7c}",
  "," => "\\u{2c}"
}
METRIC_TYPES =
{
  c: CounterMetric,
  d: DistributionMetric,
  g: GaugeMetric,
  s: SetMetric
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from ThreadedPeriodicWorker

#ensure_thread, #kill

Constructor Details

#initialize(configuration, client) ⇒ Aggregator

Returns a new instance of Aggregator.

[View source]

36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/sentry/metrics/aggregator.rb', line 36

def initialize(configuration, client)
  super(configuration.logger, FLUSH_INTERVAL)
  @client = client
  @before_emit = configuration.metrics.before_emit
  @enable_code_locations = configuration.metrics.enable_code_locations
  @stacktrace_builder = configuration.stacktrace_builder

  @default_tags = {}
  @default_tags["release"] = configuration.release if configuration.release
  @default_tags["environment"] = configuration.environment if configuration.environment

  @mutex = Mutex.new

  # a nested hash of timestamp -> bucket keys -> Metric instance
  @buckets = {}

  # the flush interval needs to be shifted once per startup to create jittering
  @flush_shift = Random.rand * ROLLUP_IN_SECONDS

  # a nested hash of timestamp (start of day) -> meta keys -> frame
  @code_locations = {}
end

Instance Attribute Details

#bucketsObject (readonly)

exposed only for testing


34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def buckets
  @buckets
end

#clientObject (readonly)

exposed only for testing


34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def client
  @client
end

#code_locationsObject (readonly)

exposed only for testing


34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def code_locations
  @code_locations
end

#flush_shiftObject (readonly)

exposed only for testing


34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def flush_shift
  @flush_shift
end

#threadObject (readonly)

exposed only for testing


34
35
36
# File 'lib/sentry/metrics/aggregator.rb', line 34

def thread
  @thread
end

Instance Method Details

#add(type, key, value, unit: "none", tags: {}, timestamp: nil, stacklevel: nil) ⇒ Object

[View source]

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/sentry/metrics/aggregator.rb', line 59

def add(type,
        key,
        value,
        unit: "none",
        tags: {},
        timestamp: nil,
        stacklevel: nil)
  return unless ensure_thread
  return unless METRIC_TYPES.keys.include?(type)

  updated_tags = get_updated_tags(tags)
  return if @before_emit && !@before_emit.call(key, updated_tags)

  timestamp ||= Sentry.utc_now

  # this is integer division and thus takes the floor of the division
  # and buckets into 10 second intervals
  bucket_timestamp = (timestamp.to_i / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS

  serialized_tags = serialize_tags(updated_tags)
  bucket_key = [type, key, unit, serialized_tags]

  added = @mutex.synchronize do
    record_code_location(type, key, unit, timestamp, stacklevel: stacklevel) if @enable_code_locations
    process_bucket(bucket_timestamp, bucket_key, type, value)
  end

  # for sets, we pass on if there was a new entry to the local gauge
  local_value = type == :s ? added : value
  process_span_aggregator(bucket_key, local_value)
end

#flush(force: false) ⇒ Object Also known as: run

[View source]

91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/sentry/metrics/aggregator.rb', line 91

def flush(force: false)
  flushable_buckets = get_flushable_buckets!(force)
  code_locations = get_code_locations!
  return if flushable_buckets.empty? && code_locations.empty?

  envelope = Envelope.new

  unless flushable_buckets.empty?
    payload = serialize_buckets(flushable_buckets)
    envelope.add_item(
      { type: "statsd", length: payload.bytesize },
      payload
    )
  end

  unless code_locations.empty?
    code_locations.each do |timestamp, locations|
      payload = serialize_locations(timestamp, locations)
      envelope.add_item(
        { type: "metric_meta", content_type: "application/json" },
        payload
      )
    end
  end

  @client.capture_envelope(envelope)
end