Class: A2A::Monitoring::MetricsCollector

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/a2a/monitoring.rb,
lib/a2a/monitoring/metrics_collector.rb

Overview

Metrics collection interface

Constant Summary collapse

COUNTER =

Metric types

:counter
GAUGE =
:gauge
HISTOGRAM =
:histogram
TIMER =
:timer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(flush_interval: 60, retention_period: 3600) ⇒ MetricsCollector

Initialize metrics collector

Parameters:

  • (defaults to: 60)

    Interval to flush metrics (seconds)

  • (defaults to: 3600)

    How long to keep metrics (seconds)



109
110
111
112
113
114
115
116
# File 'lib/a2a/monitoring.rb', line 109

def initialize(config)
  @config = config
  @backends = []
  @metrics_buffer = []
  @mutex = Mutex.new

  setup_backends
end

Instance Attribute Details

#metricsObject (readonly)

Returns the value of attribute metrics.



27
28
29
# File 'lib/a2a/monitoring/metrics_collector.rb', line 27

def metrics
  @metrics
end

#start_timeObject (readonly)

Returns the value of attribute start_time.



27
28
29
# File 'lib/a2a/monitoring/metrics_collector.rb', line 27

def start_time
  @start_time
end

Instance Method Details

#add_alert(name, metric:, condition:, threshold:, &callback) ⇒ Object

Add an alert rule

Parameters:

  • Alert name

  • Metric name to monitor

  • Condition (:gt, :lt, :eq, :gte, :lte)

  • Threshold value

  • Callback to execute when alert fires



197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/a2a/monitoring/metrics_collector.rb', line 197

def add_alert(name, metric:, condition:, threshold:, &callback)
  synchronize do
    @alert_rules << {
      name: name,
      metric: metric,
      condition: condition,
      threshold: threshold,
      callback: callback,
      last_fired: nil
    }
  end
end

#add_backend(backend) ⇒ Object

Add metrics backend

Parameters:

  • Metrics backend



155
156
157
# File 'lib/a2a/monitoring.rb', line 155

def add_backend(backend)
  @backends << backend
end

#add_exporter(exporter) ⇒ Object

Add a metrics exporter

Parameters:

  • Exporter object with export(metrics) method



177
178
179
# File 'lib/a2a/monitoring/metrics_collector.rb', line 177

def add_exporter(exporter)
  synchronize { @exporters << exporter }
end

#build_metric_key(name, tags) ⇒ String (private)

Build metric key from name and tags

Parameters:

  • Metric name

  • Metric tags

Returns:

  • Metric key



301
302
303
304
305
306
307
308
# File 'lib/a2a/monitoring/metrics_collector.rb', line 301

def build_metric_key(name, tags)
  if tags.empty?
    name
  else
    tag_string = tags.sort.map { |k, v| "#{k}=#{v}" }.join(",")
    "#{name}{#{tag_string}}"
  end
end

#check_alerts(metric_name, value, tags) ⇒ Object (private)

Check alert rules

Parameters:

  • Metric name

  • Metric value

  • Metric tags



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/a2a/monitoring/metrics_collector.rb', line 354

def check_alerts(metric_name, value, tags)
  @alert_rules.each do |rule|
    next unless rule[:metric] == metric_name

    should_fire = case rule[:condition]
                  when :gt then value > rule[:threshold]
                  when :gte then value >= rule[:threshold]
                  when :lt then value < rule[:threshold]
                  when :lte then value <= rule[:threshold]
                  when :eq then value == rule[:threshold]
                  else false
                  end

    if should_fire && (rule[:last_fired].nil? || Time.now - rule[:last_fired] > 60)
      rule[:callback]&.call(rule[:name], metric_name, value, tags)
      rule[:last_fired] = Time.now
    end
  end
end

#cleanup_old_metricsObject (private)

Clean up old metrics



402
403
404
405
406
407
408
409
410
# File 'lib/a2a/monitoring/metrics_collector.rb', line 402

def cleanup_old_metrics
  cutoff_time = Time.now - @retention_period

  synchronize do
    @metrics.reject! do |_, metric|
      metric[:last_updated] < cutoff_time
    end
  end
end

#current_metricsArray<Hash>

Get current metrics

Returns:

  • Current metrics buffer



161
162
163
# File 'lib/a2a/monitoring.rb', line 161

def current_metrics
  @mutex.synchronize { @metrics_buffer.dup }
end

#deep_copy(obj) ⇒ Object (private)

Deep copy a hash

Parameters:

  • Object to copy

Returns:

  • Deep copy



431
432
433
434
435
# File 'lib/a2a/monitoring/metrics_collector.rb', line 431

def deep_copy(obj)
  Marshal.load(Marshal.dump(obj))
rescue StandardError
  obj.dup
end

#flush!Object

Flush metrics to exporters



166
167
168
169
170
# File 'lib/a2a/monitoring.rb', line 166

def flush!
  @mutex.synchronize do
    @metrics_buffer.clear
  end
end

#flush_if_neededObject (private)



182
183
184
185
186
187
# File 'lib/a2a/monitoring.rb', line 182

def flush_if_needed
  return unless @metrics_buffer.size >= 100

  # In a real implementation, you might want to flush to persistent storage
  @config.logger&.debug("Metrics buffer size: #{@metrics_buffer.size}")
end

#gauge(name, value, **tags) ⇒ Object

Set a gauge metric value

Parameters:

  • Metric name

  • Metric value

  • Metric tags



70
71
72
73
74
75
76
77
78
# File 'lib/a2a/monitoring/metrics_collector.rb', line 70

def gauge(name, value, **tags)
  synchronize do
    metric = get_or_create_metric(name, GAUGE, tags)
    metric[:value] = value
    metric[:last_updated] = Time.now

    check_alerts(name, value, tags)
  end
end

#get_memory_usageInteger (private)

Get current memory usage

Returns:

  • Memory usage in bytes



416
417
418
419
420
421
422
423
424
# File 'lib/a2a/monitoring/metrics_collector.rb', line 416

def get_memory_usage
  if defined?(GC.stat)
    GC.stat(:heap_allocated_pages) * GC.stat(:heap_page_size)
  else
    0
  end
rescue StandardError
  0
end

#get_or_create_metric(name, type, tags) ⇒ Hash (private)

Get or create a metric

Parameters:

  • Metric name

  • Metric type

  • Metric tags

Returns:

  • Metric data



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/a2a/monitoring/metrics_collector.rb', line 279

def get_or_create_metric(name, type, tags)
  key = build_metric_key(name, tags)

  @metrics[key] ||= {
    name: name,
    type: type,
    tags: tags,
    value: type == COUNTER ? 0 : nil,
    values: type == HISTOGRAM ? [] : nil,
    count: type == HISTOGRAM ? 0 : nil,
    sum: type == HISTOGRAM ? 0 : nil,
    created_at: Time.now,
    last_updated: Time.now
  }
end

#histogram(name, value, **tags) ⇒ Object

Record a histogram value

Parameters:

  • Metric name

  • Value to record

  • Metric tags



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/a2a/monitoring/metrics_collector.rb', line 86

def histogram(name, value, **tags)
  synchronize do
    metric = get_or_create_metric(name, HISTOGRAM, tags)
    metric[:values] << value
    metric[:count] += 1
    metric[:sum] += value
    metric[:last_updated] = Time.now

    # Calculate percentiles
    update_histogram_stats(metric)

    check_alerts(name, value, tags)
  end
end

#http_request(method:, path:, status:, duration:, **tags) ⇒ Object

Record HTTP request metrics

Parameters:

  • HTTP method

  • Request path

  • Response status

  • Request duration in seconds

  • Additional tags



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/a2a/monitoring/metrics_collector.rb', line 133

def http_request(method:, path:, status:, duration:, **tags)
  base_tags = {
    method: method.to_s.upcase,
    path: normalize_path(path),
    status: status,
    **tags
  }

  increment("http_requests_total", **base_tags)
  histogram("http_request_duration_ms", duration * 1000, **base_tags)

  # Track error rates
  return unless status >= 400

  increment("http_requests_errors_total", **base_tags)
end

#increment(name, value: 1, **tags) ⇒ Object

Increment a counter metric

Parameters:

  • Metric name

  • (defaults to: 1)

    Value to add (default: 1)

  • Metric tags



141
142
143
# File 'lib/a2a/monitoring.rb', line 141

def increment(name, **labels)
  record("#{name}_total", 1, **labels)
end

#normalize_path(path) ⇒ String (private)

Normalize URL path for metrics

Parameters:

  • URL path

Returns:

  • Normalized path



379
380
381
382
383
# File 'lib/a2a/monitoring/metrics_collector.rb', line 379

def normalize_path(path)
  # Replace IDs and UUIDs with placeholders
  path.gsub(%r{/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}, "/:uuid")
      .gsub(%r{/\d+}, "/:id")
end

#percentile(values, percentile) ⇒ Numeric (private)

Calculate percentile from sorted values

Parameters:

  • Sorted array of values

  • Percentile (0.0 to 1.0)

Returns:

  • Percentile value



341
342
343
344
345
346
# File 'lib/a2a/monitoring/metrics_collector.rb', line 341

def percentile(values, percentile)
  return 0 if values.empty?

  index = (percentile * (values.size - 1)).round
  values[index]
end

#record(name, value, **labels) ⇒ Object

Record a metric

Parameters:

  • Metric name

  • Metric value

  • Metric labels



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/a2a/monitoring.rb', line 122

def record(name, value, **labels)
  metric = {
    name: name,
    value: value,
    labels: labels,
    timestamp: Time.now.to_f
  }

  @mutex.synchronize do
    @metrics_buffer << metric
    flush_if_needed
  end

  @backends.each { |backend| backend.record(name, value, **labels) }
end

#remove_exporter(exporter) ⇒ Object

Remove a metrics exporter

Parameters:

  • Exporter to remove



185
186
187
# File 'lib/a2a/monitoring/metrics_collector.rb', line 185

def remove_exporter(exporter)
  synchronize { @exporters.delete(exporter) }
end

#reset!Object

Reset all metrics



254
255
256
257
258
259
260
# File 'lib/a2a/monitoring/metrics_collector.rb', line 254

def reset!
  synchronize do
    @metrics.clear
    @start_time = Time.now
    @last_flush = Time.now
  end
end

#setup_backendsObject (private)



174
175
176
177
178
179
180
# File 'lib/a2a/monitoring.rb', line 174

def setup_backends
  # Add Prometheus backend if available
  add_backend(PrometheusBackend.new) if defined?(Prometheus)

  # Add logging backend
  add_backend(LoggingBackend.new(@config))
end

#snapshotHash

Get current metrics snapshot

Returns:

  • Current metrics



214
215
216
# File 'lib/a2a/monitoring/metrics_collector.rb', line 214

def snapshot
  synchronize { deep_copy(@metrics) }
end

#start_background_flushObject (private)

Start background flush thread



388
389
390
391
392
393
394
395
396
397
# File 'lib/a2a/monitoring/metrics_collector.rb', line 388

def start_background_flush
  @flush_thread = Thread.new do
    loop do
      sleep @flush_interval
      flush!
    rescue StandardError => e
      warn "Error in metrics flush thread: #{e.message}"
    end
  end
end

#stopObject

Stop the metrics collector



265
266
267
268
# File 'lib/a2a/monitoring/metrics_collector.rb', line 265

def stop
  @flush_thread&.kill
  @flush_thread = nil
end

#summaryHash

Get metrics summary

Returns:

  • Metrics summary



222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/a2a/monitoring/metrics_collector.rb', line 222

def summary
  synchronize do
    {
      total_metrics: @metrics.size,
      uptime: Time.now - @start_time,
      last_flush: @last_flush,
      exporters: @exporters.size,
      alert_rules: @alert_rules.size,
      memory_usage: get_memory_usage
    }
  end
end

#task_operation(operation:, task_type: nil, status: nil, duration: nil, **tags) ⇒ Object

Record task metrics

Parameters:

  • Task operation

  • (defaults to: nil)

    Type of task

  • (defaults to: nil)

    Task status

  • (defaults to: nil)

    Operation duration

  • Additional tags



158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/a2a/monitoring/metrics_collector.rb', line 158

def task_operation(operation:, task_type: nil, status: nil, duration: nil, **tags)
  base_tags = {
    operation: operation,
    task_type: task_type,
    status: status,
    **tags
  }.compact

  increment("task_operations_total", **base_tags)

  return unless duration

  histogram("task_operation_duration_ms", duration * 1000, **base_tags)
end

#time(name, **tags) { ... } ⇒ Object

Time a block of code

Parameters:

  • Metric name

  • Metric tags

Yields:

  • Block to time

Returns:

  • Result of the block



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/a2a/monitoring/metrics_collector.rb', line 108

def time(name, **tags)
  start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  begin
    result = yield
    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    histogram("#{name}.duration", duration * 1000, **tags) # Convert to milliseconds
    increment("#{name}.success", **tags)
    result
  rescue StandardError
    duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
    histogram("#{name}.duration", duration * 1000, **tags)
    increment("#{name}.error", **tags)
    raise
  end
end

#timing(name, duration, **labels) ⇒ Object

Record timing information

Parameters:

  • Timer name

  • Duration in seconds

  • Timer labels



149
150
151
# File 'lib/a2a/monitoring.rb', line 149

def timing(name, duration, **labels)
  record("#{name}_duration_seconds", duration, **labels)
end

#update_histogram_stats(metric) ⇒ Object (private)

Update histogram statistics

Parameters:

  • Histogram metric



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/a2a/monitoring/metrics_collector.rb', line 314

def update_histogram_stats(metric)
  values = metric[:values].sort
  count = values.size

  return if count.zero?

  metric[:min] = values.first
  metric[:max] = values.last
  metric[:avg] = metric[:sum].to_f / count

  # Calculate percentiles
  metric[:p50] = percentile(values, 0.5)
  metric[:p95] = percentile(values, 0.95)
  metric[:p99] = percentile(values, 0.99)

  # Keep only recent values to prevent memory growth
  return unless values.size > 1000

  metric[:values] = values.last(1000)
end