Class: Tasker::Telemetry::MetricsBackend
- Inherits:
-
Object
- Object
- Tasker::Telemetry::MetricsBackend
- Includes:
- Singleton
- Defined in:
- lib/tasker/telemetry/metrics_backend.rb
Overview
MetricsBackend provides thread-safe, high-performance native metrics collection
This class implements Tasker's core metrics storage system with thread-safe operations, automatic EventRouter integration, and zero-overhead metric collection. It follows the singleton pattern consistent with HandlerFactory and Events::Publisher.
Phase 4.2.2.3 Enhancement: Hybrid Rails Cache Architecture The backend now supports cache-agnostic dual storage combining in-memory performance with Rails.cache persistence and cross-container coordination.
The backend supports three core metric types:
- Counter: Monotonically increasing values (requests, errors, completions)
- Gauge: Values that can go up/down (active connections, queue depth)
- Histogram: Statistical distributions (latencies, sizes, durations)
Cache Store Compatibility:
- Redis/Memcached: Full coordination with atomic operations and locking
- File/Memory stores: Local-only mode with clear degradation messaging
- Automatic feature detection with appropriate strategy selection
Instance Attribute Summary collapse
-
#cache_capabilities ⇒ Hash
readonly
Cache capabilities detected at initialization.
-
#cache_strategy ⇒ Tasker::CacheStrategy
readonly
Cache strategy for this instance.
-
#created_at ⇒ Time
readonly
Backend creation timestamp for monitoring.
-
#event_router ⇒ EventRouter
readonly
EventRouter instance for intelligent event routing.
-
#instance_id ⇒ String
readonly
Unique instance identifier for distributed coordination.
-
#metrics ⇒ Concurrent::Hash
readonly
Core metric registry storing all active metrics Using ConcurrentHash for thread-safe operations without locks.
-
#sync_config ⇒ Hash
readonly
Configuration for cache synchronization.
-
#sync_strategy ⇒ Symbol
readonly
Selected sync strategy based on cache capabilities.
Instance Method Summary collapse
-
#all_metrics ⇒ Hash
Get all registered metrics.
-
#clear! ⇒ Integer
Clear all metrics (primarily for testing).
-
#counter(name, **labels) ⇒ MetricTypes::Counter
Get or create a counter metric.
-
#export ⇒ Hash
Export all metrics to a format suitable for monitoring systems.
-
#export_distributed_metrics(include_instances: false) ⇒ Hash
Export metrics with distributed coordination when supported.
-
#gauge(name, **labels) ⇒ MetricTypes::Gauge
Get or create a gauge metric.
-
#handle_event(event_name, payload = {}) ⇒ Boolean
Handle an event from EventRouter and collect appropriate metrics.
-
#histogram(name, buckets: nil, **labels) ⇒ MetricTypes::Histogram
Get or create a histogram metric.
-
#initialize ⇒ MetricsBackend
constructor
Initialize the metrics backend.
-
#register_event_router(router) ⇒ EventRouter
Register the EventRouter for intelligent routing.
-
#stats ⇒ Hash
Get summary statistics about the metrics backend.
-
#sync_to_cache! ⇒ Hash
Synchronize in-memory metrics to Rails.cache using detected strategy.
Constructor Details
#initialize ⇒ MetricsBackend
Initialize the metrics backend
Sets up thread-safe storage, integrates with EventRouter, and configures cache capabilities for hybrid architecture. Called automatically via singleton pattern.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 84 def initialize @metrics = Concurrent::Hash.new @event_router = nil @local_buffer = [] @last_sync = Time.current @created_at = Time.current.freeze # Thread-safe metric creation lock @metric_creation_lock = Mutex.new # Use unified cache strategy for capability detection @cache_strategy = Tasker::CacheStrategy.detect @instance_id = @cache_strategy.instance_id @sync_strategy = @cache_strategy.coordination_mode # Extract capabilities for backward compatibility @cache_capabilities = @cache_strategy.export_capabilities @sync_config = configure_sync_parameters log_cache_strategy_selection end |
Instance Attribute Details
#cache_capabilities ⇒ Hash (readonly)
Cache capabilities detected at initialization
61 62 63 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 61 def cache_capabilities @cache_capabilities end |
#cache_strategy ⇒ Tasker::CacheStrategy (readonly)
Cache strategy for this instance
77 78 79 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 77 def cache_strategy @cache_strategy end |
#created_at ⇒ Time (readonly)
Backend creation timestamp for monitoring
57 58 59 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 57 def created_at @created_at end |
#event_router ⇒ EventRouter (readonly)
EventRouter instance for intelligent event routing
53 54 55 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 53 def event_router @event_router end |
#instance_id ⇒ String (readonly)
Unique instance identifier for distributed coordination
69 70 71 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 69 def instance_id @instance_id end |
#metrics ⇒ Concurrent::Hash (readonly)
Core metric registry storing all active metrics Using ConcurrentHash for thread-safe operations without locks
49 50 51 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 49 def metrics @metrics end |
#sync_config ⇒ Hash (readonly)
Configuration for cache synchronization
73 74 75 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 73 def sync_config @sync_config end |
#sync_strategy ⇒ Symbol (readonly)
Selected sync strategy based on cache capabilities
65 66 67 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 65 def sync_strategy @sync_strategy end |
Instance Method Details
#all_metrics ⇒ Hash
Get all registered metrics
Returns a thread-safe snapshot of all current metrics for export to monitoring systems like Prometheus.
319 320 321 322 323 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 319 def all_metrics @metrics.each_with_object({}) do |(key, metric), result| result[key] = metric end end |
#clear! ⇒ Integer
Clear all metrics (primarily for testing)
359 360 361 362 363 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 359 def clear! cleared_count = @metrics.size @metrics.clear cleared_count end |
#counter(name, **labels) ⇒ MetricTypes::Counter
Get or create a counter metric
Counters are thread-safe and support only increment operations. They're ideal for counting events, requests, errors, etc.
130 131 132 133 134 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 130 def counter(name, **labels) get_or_create_metric(name, labels, :counter) do MetricTypes::Counter.new(name, labels: labels) end end |
#export ⇒ Hash
Export all metrics to a format suitable for monitoring systems
Provides a comprehensive export of all metric data including metadata, current values, and statistical information.
331 332 333 334 335 336 337 338 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 331 def export { timestamp: Time.current, backend_created_at: @created_at, total_metrics: @metrics.size, metrics: all_metrics.transform_values(&:to_h) } end |
#export_distributed_metrics(include_instances: false) ⇒ Hash
Export metrics with distributed coordination when supported
This method aggregates metrics across containers when possible, falls back to local export for limited cache stores.
228 229 230 231 232 233 234 235 236 237 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 228 def export_distributed_metrics(include_instances: false) case @sync_strategy when :distributed_atomic, :distributed_basic aggregate_from_distributed_cache(include_instances: include_instances) when :local_only export_local_metrics_with_warning else export # Fallback to standard local export end end |
#gauge(name, **labels) ⇒ MetricTypes::Gauge
Get or create a gauge metric
Gauges are thread-safe and support set, increment, and decrement operations. They're ideal for values that fluctuate like connections, queue depth, etc.
150 151 152 153 154 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 150 def gauge(name, **labels) get_or_create_metric(name, labels, :gauge) do MetricTypes::Gauge.new(name, labels: labels) end end |
#handle_event(event_name, payload = {}) ⇒ Boolean
Handle an event from EventRouter and collect appropriate metrics
This method is called by EventRouter when an event should be routed to the metrics backend. It automatically creates and updates metrics based on event type and payload.
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 257 def handle_event(event_name, payload = {}) return false unless payload.is_a?(Hash) case event_name when /\.started$/ # Task/Step started events -> increment counter counter("#{extract_entity_type(event_name)}_started_total", **extract_labels(payload)).increment when /\.completed$/ # Task/Step completed events -> counter + duration histogram entity_type = extract_entity_type(event_name) labels = extract_labels(payload) counter("#{entity_type}_completed_total", **labels).increment if (duration = extract_duration(payload)) histogram("#{entity_type}_duration_seconds", **labels).observe(duration) end when /\.failed$/ # Task/Step failed events -> error counter + duration histogram entity_type = extract_entity_type(event_name) labels = extract_labels(payload) counter("#{entity_type}_failed_total", **labels).increment if (duration = extract_duration(payload)) histogram("#{entity_type}_duration_seconds", **labels).observe(duration) end when /\.cancelled$/ # Task/Step cancelled events -> cancellation counter counter("#{extract_entity_type(event_name)}_cancelled_total", **extract_labels(payload)).increment when /workflow\.iteration/ # Workflow iteration events -> gauge for active tasks gauge('workflow_active_tasks').set(payload[:active_task_count]) if payload[:active_task_count] if payload[:iteration_duration] histogram('workflow_iteration_duration_seconds').observe(payload[:iteration_duration]) end when /system\.health/ # System health events -> health gauges gauge('system_healthy_tasks').set(payload[:healthy_task_count]) if payload[:healthy_task_count] gauge('system_failed_tasks').set(payload[:failed_task_count]) if payload[:failed_task_count] end true rescue StandardError => e # Fail gracefully - metrics collection should never break the application warn "MetricsBackend failed to handle event #{event_name}: #{e.}" false end |
#histogram(name, buckets: nil, **labels) ⇒ MetricTypes::Histogram
Get or create a histogram metric
Histograms are thread-safe and provide statistical analysis of observed values. They're ideal for measuring durations, sizes, and distributions.
170 171 172 173 174 175 176 177 178 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 170 def histogram(name, buckets: nil, **labels) get_or_create_metric(name, labels, :histogram) do if buckets MetricTypes::Histogram.new(name, labels: labels, buckets: buckets) else MetricTypes::Histogram.new(name, labels: labels) end end end |
#register_event_router(router) ⇒ EventRouter
Register the EventRouter for intelligent routing
This method is called by EventRouter during configuration to enable automatic metric collection based on routing decisions.
113 114 115 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 113 def register_event_router(router) @event_router = router end |
#stats ⇒ Hash
Get summary statistics about the metrics backend
343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 343 def stats metric_types = all_metrics.values.group_by { |m| m.to_h[:type] } { total_metrics: @metrics.size, counter_metrics: metric_types[:counter]&.size || 0, gauge_metrics: metric_types[:gauge]&.size || 0, histogram_metrics: metric_types[:histogram]&.size || 0, backend_uptime: Time.current - @created_at, created_at: @created_at } end |
#sync_to_cache! ⇒ Hash
Synchronize in-memory metrics to Rails.cache using detected strategy
This method implements the core cache synchronization logic that adapts to the available Rails.cache store capabilities.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/tasker/telemetry/metrics_backend.rb', line 190 def sync_to_cache! return { success: false, error: 'Rails.cache not available' } unless rails_cache_available? start_time = Time.current case @sync_strategy when :distributed_atomic result = sync_with_atomic_operations when :distributed_basic result = sync_with_read_modify_write when :local_only result = sync_to_local_cache else return { success: false, error: "Unknown sync strategy: #{@sync_strategy}" } end final_result = result.merge( duration: Time.current - start_time, timestamp: Time.current.iso8601, instance_id: @instance_id ) # Coordinate with export system coordinate_cache_sync(final_result) final_result rescue StandardError => e log_sync_error(e) { success: false, error: e., timestamp: Time.current.iso8601 } end |