Class: Sentry::Metrics::Aggregator
- Inherits:
-
ThreadedPeriodicWorker
- Object
- ThreadedPeriodicWorker
- Sentry::Metrics::Aggregator
- Defined in:
- lib/sentry/metrics/aggregator.rb
Constant Summary collapse
- FLUSH_INTERVAL =
5
- ROLLUP_IN_SECONDS =
10
- DEFAULT_STACKLEVEL =
this is how far removed from user code in the backtrace we are when we record code locations
4
- KEY_SANITIZATION_REGEX =
/[^a-zA-Z0-9_\-.]+/
- UNIT_SANITIZATION_REGEX =
/[^a-zA-Z0-9_]+/
- TAG_KEY_SANITIZATION_REGEX =
/[^a-zA-Z0-9_\-.\/]+/
- TAG_VALUE_SANITIZATION_MAP =
{ "\n" => "\\n", "\r" => "\\r", "\t" => "\\t", "\\" => "\\\\", "|" => "\\u{7c}", "," => "\\u{2c}" }
- METRIC_TYPES =
{ c: CounterMetric, d: DistributionMetric, g: GaugeMetric, s: SetMetric }
Instance Attribute Summary collapse
-
#buckets ⇒ Object
readonly
exposed only for testing.
-
#client ⇒ Object
readonly
exposed only for testing.
-
#code_locations ⇒ Object
readonly
exposed only for testing.
-
#flush_shift ⇒ Object
readonly
exposed only for testing.
-
#thread ⇒ Object
readonly
exposed only for testing.
Instance Method Summary collapse
- #add(type, key, value, unit: "none", tags: {}, timestamp: nil, stacklevel: nil) ⇒ Object
- #flush(force: false) ⇒ Object (also: #run)
-
#initialize(configuration, client) ⇒ Aggregator
constructor
A new instance of Aggregator.
Methods inherited from ThreadedPeriodicWorker
Constructor Details
permalink #initialize(configuration, client) ⇒ Aggregator
Returns a new instance of Aggregator.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/sentry/metrics/aggregator.rb', line 36 def initialize(configuration, client) super(configuration.logger, FLUSH_INTERVAL) @client = client @before_emit = configuration.metrics.before_emit @enable_code_locations = configuration.metrics.enable_code_locations @stacktrace_builder = configuration.stacktrace_builder @default_tags = {} @default_tags["release"] = configuration.release if configuration.release @default_tags["environment"] = configuration.environment if configuration.environment @mutex = Mutex.new # a nested hash of timestamp -> bucket keys -> Metric instance @buckets = {} # the flush interval needs to be shifted once per startup to create jittering @flush_shift = Random.rand * ROLLUP_IN_SECONDS # a nested hash of timestamp (start of day) -> meta keys -> frame @code_locations = {} end |
Instance Attribute Details
permalink #buckets ⇒ Object (readonly)
exposed only for testing
34 35 36 |
# File 'lib/sentry/metrics/aggregator.rb', line 34 def buckets @buckets end |
permalink #client ⇒ Object (readonly)
exposed only for testing
34 35 36 |
# File 'lib/sentry/metrics/aggregator.rb', line 34 def client @client end |
permalink #code_locations ⇒ Object (readonly)
exposed only for testing
34 35 36 |
# File 'lib/sentry/metrics/aggregator.rb', line 34 def code_locations @code_locations end |
permalink #flush_shift ⇒ Object (readonly)
exposed only for testing
34 35 36 |
# File 'lib/sentry/metrics/aggregator.rb', line 34 def flush_shift @flush_shift end |
permalink #thread ⇒ Object (readonly)
exposed only for testing
34 35 36 |
# File 'lib/sentry/metrics/aggregator.rb', line 34 def thread @thread end |
Instance Method Details
permalink #add(type, key, value, unit: "none", tags: {}, timestamp: nil, stacklevel: nil) ⇒ Object
[View source]
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/sentry/metrics/aggregator.rb', line 59 def add(type, key, value, unit: "none", tags: {}, timestamp: nil, stacklevel: nil) return unless ensure_thread return unless METRIC_TYPES.keys.include?(type) = () return if @before_emit && !@before_emit.call(key, ) ||= Sentry.utc_now # this is integer division and thus takes the floor of the division # and buckets into 10 second intervals = (.to_i / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS = () bucket_key = [type, key, unit, ] added = @mutex.synchronize do record_code_location(type, key, unit, , stacklevel: stacklevel) if @enable_code_locations process_bucket(, bucket_key, type, value) end # for sets, we pass on if there was a new entry to the local gauge local_value = type == :s ? added : value process_span_aggregator(bucket_key, local_value) end |
permalink #flush(force: false) ⇒ Object Also known as: run
[View source]
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/sentry/metrics/aggregator.rb', line 91 def flush(force: false) flushable_buckets = get_flushable_buckets!(force) code_locations = get_code_locations! return if flushable_buckets.empty? && code_locations.empty? envelope = Envelope.new unless flushable_buckets.empty? payload = serialize_buckets(flushable_buckets) envelope.add_item( { type: "statsd", length: payload.bytesize }, payload ) end unless code_locations.empty? code_locations.each do |, locations| payload = serialize_locations(, locations) envelope.add_item( { type: "metric_meta", content_type: "application/json" }, payload ) end end @client.capture_envelope(envelope) end |