Class: A2A::Monitoring::MetricsCollector
- Inherits:
-
Object
- Object
- A2A::Monitoring::MetricsCollector
- Includes:
- MonitorMixin
- Defined in:
- lib/a2a/monitoring.rb,
lib/a2a/monitoring/metrics_collector.rb
Overview
Metrics collection interface
Constant Summary collapse
- COUNTER =
Metric types
:counter- GAUGE =
:gauge- HISTOGRAM =
:histogram- TIMER =
:timer
Instance Attribute Summary collapse
-
#metrics ⇒ Object
readonly
Returns the value of attribute metrics.
-
#start_time ⇒ Object
readonly
Returns the value of attribute start_time.
Instance Method Summary collapse
-
#add_alert(name, metric:, condition:, threshold:, &callback) ⇒ Object
Add an alert rule.
-
#add_backend(backend) ⇒ Object
Add metrics backend.
-
#add_exporter(exporter) ⇒ Object
Add a metrics exporter.
-
#build_metric_key(name, tags) ⇒ String
private
Build metric key from name and tags.
-
#check_alerts(metric_name, value, tags) ⇒ Object
private
Check alert rules.
-
#cleanup_old_metrics ⇒ Object
private
Clean up old metrics.
-
#current_metrics ⇒ Array<Hash>
Get current metrics.
-
#deep_copy(obj) ⇒ Object
private
Deep copy a hash.
-
#flush! ⇒ Object
Flush metrics to exporters.
- #flush_if_needed ⇒ Object private
-
#gauge(name, value, **tags) ⇒ Object
Set a gauge metric value.
-
#get_memory_usage ⇒ Integer
private
Get current memory usage.
-
#get_or_create_metric(name, type, tags) ⇒ Hash
private
Get or create a metric.
-
#histogram(name, value, **tags) ⇒ Object
Record a histogram value.
-
#http_request(method:, path:, status:, duration:, **tags) ⇒ Object
Record HTTP request metrics.
-
#increment(name, value: 1, **tags) ⇒ Object
Increment a counter metric.
-
#initialize(flush_interval: 60, retention_period: 3600) ⇒ MetricsCollector
constructor
Initialize metrics collector.
-
#normalize_path(path) ⇒ String
private
Normalize URL path for metrics.
-
#percentile(values, percentile) ⇒ Numeric
private
Calculate percentile from sorted values.
-
#record(name, value, **labels) ⇒ Object
Record a metric.
-
#remove_exporter(exporter) ⇒ Object
Remove a metrics exporter.
-
#reset! ⇒ Object
Reset all metrics.
- #setup_backends ⇒ Object private
-
#snapshot ⇒ Hash
Get current metrics snapshot.
-
#start_background_flush ⇒ Object
private
Start background flush thread.
-
#stop ⇒ Object
Stop the metrics collector.
-
#summary ⇒ Hash
Get metrics summary.
-
#task_operation(operation:, task_type: nil, status: nil, duration: nil, **tags) ⇒ Object
Record task metrics.
-
#time(name, **tags) { ... } ⇒ Object
Time a block of code.
-
#timing(name, duration, **labels) ⇒ Object
Record timing information.
-
#update_histogram_stats(metric) ⇒ Object
private
Update histogram statistics.
Constructor Details
#initialize(flush_interval: 60, retention_period: 3600) ⇒ MetricsCollector
Initialize metrics collector
109 110 111 112 113 114 115 116 |
# File 'lib/a2a/monitoring.rb', line 109 def initialize(config) @config = config @backends = [] @metrics_buffer = [] @mutex = Mutex.new setup_backends end |
Instance Attribute Details
#metrics ⇒ Object (readonly)
Returns the value of attribute metrics.
27 28 29 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 27 def metrics @metrics end |
#start_time ⇒ Object (readonly)
Returns the value of attribute start_time.
27 28 29 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 27 def start_time @start_time end |
Instance Method Details
#add_alert(name, metric:, condition:, threshold:, &callback) ⇒ Object
Add an alert rule
197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 197 def add_alert(name, metric:, condition:, threshold:, &callback) synchronize do @alert_rules << { name: name, metric: metric, condition: condition, threshold: threshold, callback: callback, last_fired: nil } end end |
#add_backend(backend) ⇒ Object
Add metrics backend
155 156 157 |
# File 'lib/a2a/monitoring.rb', line 155 def add_backend(backend) @backends << backend end |
#add_exporter(exporter) ⇒ Object
Add a metrics exporter
177 178 179 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 177 def add_exporter(exporter) synchronize { @exporters << exporter } end |
#build_metric_key(name, tags) ⇒ String (private)
Build metric key from name and tags
301 302 303 304 305 306 307 308 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 301 def build_metric_key(name, ) if .empty? name else tag_string = .sort.map { |k, v| "#{k}=#{v}" }.join(",") "#{name}{#{tag_string}}" end end |
#check_alerts(metric_name, value, tags) ⇒ Object (private)
Check alert rules
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 354 def check_alerts(metric_name, value, ) @alert_rules.each do |rule| next unless rule[:metric] == metric_name should_fire = case rule[:condition] when :gt then value > rule[:threshold] when :gte then value >= rule[:threshold] when :lt then value < rule[:threshold] when :lte then value <= rule[:threshold] when :eq then value == rule[:threshold] else false end if should_fire && (rule[:last_fired].nil? || Time.now - rule[:last_fired] > 60) rule[:callback]&.call(rule[:name], metric_name, value, ) rule[:last_fired] = Time.now end end end |
#cleanup_old_metrics ⇒ Object (private)
Clean up old metrics
402 403 404 405 406 407 408 409 410 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 402 def cleanup_old_metrics cutoff_time = Time.now - @retention_period synchronize do @metrics.reject! do |_, metric| metric[:last_updated] < cutoff_time end end end |
#current_metrics ⇒ Array<Hash>
Get current metrics
161 162 163 |
# File 'lib/a2a/monitoring.rb', line 161 def current_metrics @mutex.synchronize { @metrics_buffer.dup } end |
#deep_copy(obj) ⇒ Object (private)
Deep copy a hash
431 432 433 434 435 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 431 def deep_copy(obj) Marshal.load(Marshal.dump(obj)) rescue StandardError obj.dup end |
#flush! ⇒ Object
Flush metrics to exporters
166 167 168 169 170 |
# File 'lib/a2a/monitoring.rb', line 166 def flush! @mutex.synchronize do @metrics_buffer.clear end end |
#flush_if_needed ⇒ Object (private)
182 183 184 185 186 187 |
# File 'lib/a2a/monitoring.rb', line 182 def flush_if_needed return unless @metrics_buffer.size >= 100 # In a real implementation, you might want to flush to persistent storage @config.logger&.debug("Metrics buffer size: #{@metrics_buffer.size}") end |
#gauge(name, value, **tags) ⇒ Object
Set a gauge metric value
70 71 72 73 74 75 76 77 78 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 70 def gauge(name, value, **) synchronize do metric = get_or_create_metric(name, GAUGE, ) metric[:value] = value metric[:last_updated] = Time.now check_alerts(name, value, ) end end |
#get_memory_usage ⇒ Integer (private)
Get current memory usage
416 417 418 419 420 421 422 423 424 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 416 def get_memory_usage if defined?(GC.stat) GC.stat(:heap_allocated_pages) * GC.stat(:heap_page_size) else 0 end rescue StandardError 0 end |
#get_or_create_metric(name, type, tags) ⇒ Hash (private)
Get or create a metric
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 279 def get_or_create_metric(name, type, ) key = build_metric_key(name, ) @metrics[key] ||= { name: name, type: type, tags: , value: type == COUNTER ? 0 : nil, values: type == HISTOGRAM ? [] : nil, count: type == HISTOGRAM ? 0 : nil, sum: type == HISTOGRAM ? 0 : nil, created_at: Time.now, last_updated: Time.now } end |
#histogram(name, value, **tags) ⇒ Object
Record a histogram value
86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 86 def histogram(name, value, **) synchronize do metric = get_or_create_metric(name, HISTOGRAM, ) metric[:values] << value metric[:count] += 1 metric[:sum] += value metric[:last_updated] = Time.now # Calculate percentiles update_histogram_stats(metric) check_alerts(name, value, ) end end |
#http_request(method:, path:, status:, duration:, **tags) ⇒ Object
Record HTTP request metrics
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 133 def http_request(method:, path:, status:, duration:, **) = { method: method.to_s.upcase, path: normalize_path(path), status: status, ** } increment("http_requests_total", **) histogram("http_request_duration_ms", duration * 1000, **) # Track error rates return unless status >= 400 increment("http_requests_errors_total", **) end |
#increment(name, value: 1, **tags) ⇒ Object
Increment a counter metric
141 142 143 |
# File 'lib/a2a/monitoring.rb', line 141 def increment(name, **labels) record("#{name}_total", 1, **labels) end |
#normalize_path(path) ⇒ String (private)
Normalize URL path for metrics
379 380 381 382 383 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 379 def normalize_path(path) # Replace IDs and UUIDs with placeholders path.gsub(%r{/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}}, "/:uuid") .gsub(%r{/\d+}, "/:id") end |
#percentile(values, percentile) ⇒ Numeric (private)
Calculate percentile from sorted values
341 342 343 344 345 346 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 341 def percentile(values, percentile) return 0 if values.empty? index = (percentile * (values.size - 1)).round values[index] end |
#record(name, value, **labels) ⇒ Object
Record a metric
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/a2a/monitoring.rb', line 122 def record(name, value, **labels) metric = { name: name, value: value, labels: labels, timestamp: Time.now.to_f } @mutex.synchronize do @metrics_buffer << metric flush_if_needed end @backends.each { |backend| backend.record(name, value, **labels) } end |
#remove_exporter(exporter) ⇒ Object
Remove a metrics exporter
185 186 187 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 185 def remove_exporter(exporter) synchronize { @exporters.delete(exporter) } end |
#reset! ⇒ Object
Reset all metrics
254 255 256 257 258 259 260 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 254 def reset! synchronize do @metrics.clear @start_time = Time.now @last_flush = Time.now end end |
#setup_backends ⇒ Object (private)
174 175 176 177 178 179 180 |
# File 'lib/a2a/monitoring.rb', line 174 def setup_backends # Add Prometheus backend if available add_backend(PrometheusBackend.new) if defined?(Prometheus) # Add logging backend add_backend(LoggingBackend.new(@config)) end |
#snapshot ⇒ Hash
Get current metrics snapshot
214 215 216 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 214 def snapshot synchronize { deep_copy(@metrics) } end |
#start_background_flush ⇒ Object (private)
Start background flush thread
388 389 390 391 392 393 394 395 396 397 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 388 def start_background_flush @flush_thread = Thread.new do loop do sleep @flush_interval flush! rescue StandardError => e warn "Error in metrics flush thread: #{e.message}" end end end |
#stop ⇒ Object
Stop the metrics collector
265 266 267 268 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 265 def stop @flush_thread&.kill @flush_thread = nil end |
#summary ⇒ Hash
Get metrics summary
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 222 def summary synchronize do { total_metrics: @metrics.size, uptime: Time.now - @start_time, last_flush: @last_flush, exporters: @exporters.size, alert_rules: @alert_rules.size, memory_usage: get_memory_usage } end end |
#task_operation(operation:, task_type: nil, status: nil, duration: nil, **tags) ⇒ Object
Record task metrics
158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 158 def task_operation(operation:, task_type: nil, status: nil, duration: nil, **) = { operation: operation, task_type: task_type, status: status, ** }.compact increment("task_operations_total", **) return unless duration histogram("task_operation_duration_ms", duration * 1000, **) end |
#time(name, **tags) { ... } ⇒ Object
Time a block of code
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 108 def time(name, **) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) begin result = yield duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time histogram("#{name}.duration", duration * 1000, **) # Convert to milliseconds increment("#{name}.success", **) result rescue StandardError duration = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time histogram("#{name}.duration", duration * 1000, **) increment("#{name}.error", **) raise end end |
#timing(name, duration, **labels) ⇒ Object
Record timing information
149 150 151 |
# File 'lib/a2a/monitoring.rb', line 149 def timing(name, duration, **labels) record("#{name}_duration_seconds", duration, **labels) end |
#update_histogram_stats(metric) ⇒ Object (private)
Update histogram statistics
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/a2a/monitoring/metrics_collector.rb', line 314 def update_histogram_stats(metric) values = metric[:values].sort count = values.size return if count.zero? metric[:min] = values.first metric[:max] = values.last metric[:avg] = metric[:sum].to_f / count # Calculate percentiles metric[:p50] = percentile(values, 0.5) metric[:p95] = percentile(values, 0.95) metric[:p99] = percentile(values, 0.99) # Keep only recent values to prevent memory growth return unless values.size > 1000 metric[:values] = values.last(1000) end |