Class: Tasker::Telemetry::MetricsBackend

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/tasker/telemetry/metrics_backend.rb

Overview

MetricsBackend provides thread-safe, high-performance native metrics collection

This class implements Tasker's core metrics storage system with thread-safe operations, automatic EventRouter integration, and zero-overhead metric collection. It follows the singleton pattern consistent with HandlerFactory and Events::Publisher.

Phase 4.2.2.3 Enhancement: Hybrid Rails Cache Architecture The backend now supports cache-agnostic dual storage combining in-memory performance with Rails.cache persistence and cross-container coordination.

The backend supports three core metric types:

  • Counter: Monotonically increasing values (requests, errors, completions)
  • Gauge: Values that can go up/down (active connections, queue depth)
  • Histogram: Statistical distributions (latencies, sizes, durations)

Cache Store Compatibility:

  • Redis/Memcached: Full coordination with atomic operations and locking
  • File/Memory stores: Local-only mode with clear degradation messaging
  • Automatic feature detection with appropriate strategy selection

Examples:

Basic usage

backend = MetricsBackend.instance
backend.counter('api_requests_total').increment
backend.gauge('active_tasks').set(42)
backend.histogram('task_duration_seconds').observe(1.45)

EventRouter integration

# Automatic metric collection based on event routing
backend.handle_event('task.completed', { duration: 2.1, status: 'success' })

Cache synchronization

backend.sync_to_cache!  # Periodic background sync
backend.export_distributed_metrics  # Cross-container export

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMetricsBackend

Initialize the metrics backend

Sets up thread-safe storage, integrates with EventRouter, and configures cache capabilities for hybrid architecture. Called automatically via singleton pattern.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/tasker/telemetry/metrics_backend.rb', line 84

def initialize
  @metrics = Concurrent::Hash.new
  @event_router = nil
  @local_buffer = []
  @last_sync = Time.current
  @created_at = Time.current.freeze

  # Thread-safe metric creation lock
  @metric_creation_lock = Mutex.new

  # Use unified cache strategy for capability detection
  @cache_strategy = Tasker::CacheStrategy.detect
  @instance_id = @cache_strategy.instance_id
  @sync_strategy = @cache_strategy.coordination_mode

  # Extract capabilities for backward compatibility
  @cache_capabilities = @cache_strategy.export_capabilities
  @sync_config = configure_sync_parameters

  log_cache_strategy_selection
end

Instance Attribute Details

#cache_capabilitiesHash (readonly)

Cache capabilities detected at initialization

Returns:

  • (Hash)

    Detected Rails.cache capabilities



61
62
63
# File 'lib/tasker/telemetry/metrics_backend.rb', line 61

def cache_capabilities
  @cache_capabilities
end

#cache_strategyTasker::CacheStrategy (readonly)

Cache strategy for this instance

Returns:



77
78
79
# File 'lib/tasker/telemetry/metrics_backend.rb', line 77

def cache_strategy
  @cache_strategy
end

#created_atTime (readonly)

Backend creation timestamp for monitoring

Returns:

  • (Time)

    When this backend was initialized



57
58
59
# File 'lib/tasker/telemetry/metrics_backend.rb', line 57

def created_at
  @created_at
end

#event_routerEventRouter (readonly)

EventRouter instance for intelligent event routing

Returns:



53
54
55
# File 'lib/tasker/telemetry/metrics_backend.rb', line 53

def event_router
  @event_router
end

#instance_idString (readonly)

Unique instance identifier for distributed coordination

Returns:

  • (String)

    Hostname-PID identifier for this instance



69
70
71
# File 'lib/tasker/telemetry/metrics_backend.rb', line 69

def instance_id
  @instance_id
end

#metricsConcurrent::Hash (readonly)

Core metric registry storing all active metrics Using ConcurrentHash for thread-safe operations without locks

Returns:

  • (Concurrent::Hash)

    Thread-safe metric storage



49
50
51
# File 'lib/tasker/telemetry/metrics_backend.rb', line 49

def metrics
  @metrics
end

#sync_configHash (readonly)

Configuration for cache synchronization

Returns:

  • (Hash)

    Sync configuration parameters



73
74
75
# File 'lib/tasker/telemetry/metrics_backend.rb', line 73

def sync_config
  @sync_config
end

#sync_strategySymbol (readonly)

Selected sync strategy based on cache capabilities

Returns:

  • (Symbol)

    One of :distributed_atomic, :distributed_basic, :local_only



65
66
67
# File 'lib/tasker/telemetry/metrics_backend.rb', line 65

def sync_strategy
  @sync_strategy
end

Instance Method Details

#all_metricsHash

Get all registered metrics

Returns a thread-safe snapshot of all current metrics for export to monitoring systems like Prometheus.

Returns:

  • (Hash)

    All metrics keyed by metric key



319
320
321
322
323
# File 'lib/tasker/telemetry/metrics_backend.rb', line 319

def all_metrics
  @metrics.each_with_object({}) do |(key, metric), result|
    result[key] = metric
  end
end

#clear!Integer

Clear all metrics (primarily for testing)

Returns:

  • (Integer)

    Number of metrics cleared



359
360
361
362
363
# File 'lib/tasker/telemetry/metrics_backend.rb', line 359

def clear!
  cleared_count = @metrics.size
  @metrics.clear
  cleared_count
end

#counter(name, **labels) ⇒ MetricTypes::Counter

Get or create a counter metric

Counters are thread-safe and support only increment operations. They're ideal for counting events, requests, errors, etc.

Examples:

counter = backend.counter('http_requests_total', endpoint: '/api/tasks')
counter.increment(5)

Parameters:

  • name (String)

    The metric name

  • labels (Hash)

    Optional dimensional labels

Returns:

Raises:

  • (ArgumentError)

    If name is invalid



130
131
132
133
134
# File 'lib/tasker/telemetry/metrics_backend.rb', line 130

def counter(name, **labels)
  get_or_create_metric(name, labels, :counter) do
    MetricTypes::Counter.new(name, labels: labels)
  end
end

#exportHash

Export all metrics to a format suitable for monitoring systems

Provides a comprehensive export of all metric data including metadata, current values, and statistical information.

Returns:

  • (Hash)

    Comprehensive metric export data



331
332
333
334
335
336
337
338
# File 'lib/tasker/telemetry/metrics_backend.rb', line 331

def export
  {
    timestamp: Time.current,
    backend_created_at: @created_at,
    total_metrics: @metrics.size,
    metrics: all_metrics.transform_values(&:to_h)
  }
end

#export_distributed_metrics(include_instances: false) ⇒ Hash

Export metrics with distributed coordination when supported

This method aggregates metrics across containers when possible, falls back to local export for limited cache stores.

Parameters:

  • include_instances (Boolean) (defaults to: false)

    Whether to include per-instance metrics

Returns:

  • (Hash)

    Export data with aggregated metrics when available



228
229
230
231
232
233
234
235
236
237
# File 'lib/tasker/telemetry/metrics_backend.rb', line 228

def export_distributed_metrics(include_instances: false)
  case @sync_strategy
  when :distributed_atomic, :distributed_basic
    aggregate_from_distributed_cache(include_instances: include_instances)
  when :local_only
    export_local_metrics_with_warning
  else
    export # Fallback to standard local export
  end
end

#gauge(name, **labels) ⇒ MetricTypes::Gauge

Get or create a gauge metric

Gauges are thread-safe and support set, increment, and decrement operations. They're ideal for values that fluctuate like connections, queue depth, etc.

Examples:

gauge = backend.gauge('active_connections', service: 'api')
gauge.set(100)
gauge.increment(5)

Parameters:

  • name (String)

    The metric name

  • labels (Hash)

    Optional dimensional labels

Returns:

Raises:

  • (ArgumentError)

    If name is invalid



150
151
152
153
154
# File 'lib/tasker/telemetry/metrics_backend.rb', line 150

def gauge(name, **labels)
  get_or_create_metric(name, labels, :gauge) do
    MetricTypes::Gauge.new(name, labels: labels)
  end
end

#handle_event(event_name, payload = {}) ⇒ Boolean

Handle an event from EventRouter and collect appropriate metrics

This method is called by EventRouter when an event should be routed to the metrics backend. It automatically creates and updates metrics based on event type and payload.

Examples:

Automatic usage via EventRouter

# EventRouter calls this automatically:
backend.handle_event('task.completed', {
  task_id: '123',
  duration: 2.45,
  status: 'success'
})

Parameters:

  • event_name (String)

    The lifecycle event name

  • payload (Hash) (defaults to: {})

    Event payload with metric data

Returns:

  • (Boolean)

    True if metrics were collected successfully



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/tasker/telemetry/metrics_backend.rb', line 257

def handle_event(event_name, payload = {})
  return false unless payload.is_a?(Hash)

  case event_name
  when /\.started$/
    # Task/Step started events -> increment counter
    counter("#{extract_entity_type(event_name)}_started_total", **extract_labels(payload)).increment

  when /\.completed$/
    # Task/Step completed events -> counter + duration histogram
    entity_type = extract_entity_type(event_name)
    labels = extract_labels(payload)

    counter("#{entity_type}_completed_total", **labels).increment

    if (duration = extract_duration(payload))
      histogram("#{entity_type}_duration_seconds", **labels).observe(duration)
    end

  when /\.failed$/
    # Task/Step failed events -> error counter + duration histogram
    entity_type = extract_entity_type(event_name)
    labels = extract_labels(payload)

    counter("#{entity_type}_failed_total", **labels).increment

    if (duration = extract_duration(payload))
      histogram("#{entity_type}_duration_seconds", **labels).observe(duration)
    end

  when /\.cancelled$/
    # Task/Step cancelled events -> cancellation counter
    counter("#{extract_entity_type(event_name)}_cancelled_total", **extract_labels(payload)).increment

  when /workflow\.iteration/
    # Workflow iteration events -> gauge for active tasks
    gauge('workflow_active_tasks').set(payload[:active_task_count]) if payload[:active_task_count]

    if payload[:iteration_duration]
      histogram('workflow_iteration_duration_seconds').observe(payload[:iteration_duration])
    end

  when /system\.health/
    # System health events -> health gauges
    gauge('system_healthy_tasks').set(payload[:healthy_task_count]) if payload[:healthy_task_count]

    gauge('system_failed_tasks').set(payload[:failed_task_count]) if payload[:failed_task_count]
  end

  true
rescue StandardError => e
  # Fail gracefully - metrics collection should never break the application
  warn "MetricsBackend failed to handle event #{event_name}: #{e.message}"
  false
end

#histogram(name, buckets: nil, **labels) ⇒ MetricTypes::Histogram

Get or create a histogram metric

Histograms are thread-safe and provide statistical analysis of observed values. They're ideal for measuring durations, sizes, and distributions.

Examples:

histogram = backend.histogram('request_duration_seconds', method: 'POST')
histogram.observe(0.145)

Parameters:

  • name (String)

    The metric name

  • labels (Hash)

    Optional dimensional labels

  • buckets (Array<Numeric>) (defaults to: nil)

    Optional custom bucket boundaries

Returns:

Raises:

  • (ArgumentError)

    If name is invalid or buckets are malformed



170
171
172
173
174
175
176
177
178
# File 'lib/tasker/telemetry/metrics_backend.rb', line 170

def histogram(name, buckets: nil, **labels)
  get_or_create_metric(name, labels, :histogram) do
    if buckets
      MetricTypes::Histogram.new(name, labels: labels, buckets: buckets)
    else
      MetricTypes::Histogram.new(name, labels: labels)
    end
  end
end

#register_event_router(router) ⇒ EventRouter

Register the EventRouter for intelligent routing

This method is called by EventRouter during configuration to enable automatic metric collection based on routing decisions.

Parameters:

Returns:



113
114
115
# File 'lib/tasker/telemetry/metrics_backend.rb', line 113

def register_event_router(router)
  @event_router = router
end

#statsHash

Get summary statistics about the metrics backend

Returns:

  • (Hash)

    Backend statistics and health information



343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/tasker/telemetry/metrics_backend.rb', line 343

def stats
  metric_types = all_metrics.values.group_by { |m| m.to_h[:type] }

  {
    total_metrics: @metrics.size,
    counter_metrics: metric_types[:counter]&.size || 0,
    gauge_metrics: metric_types[:gauge]&.size || 0,
    histogram_metrics: metric_types[:histogram]&.size || 0,
    backend_uptime: Time.current - @created_at,
    created_at: @created_at
  }
end

#sync_to_cache!Hash

Synchronize in-memory metrics to Rails.cache using detected strategy

This method implements the core cache synchronization logic that adapts to the available Rails.cache store capabilities.

Examples:

Periodic sync (typically called from background job)

result = backend.sync_to_cache!
# => { success: true, synced_metrics: 42, strategy: :distributed_atomic }

Returns:

  • (Hash)

    Sync result with success status and metrics count



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/tasker/telemetry/metrics_backend.rb', line 190

def sync_to_cache!
  return { success: false, error: 'Rails.cache not available' } unless rails_cache_available?

  start_time = Time.current

  case @sync_strategy
  when :distributed_atomic
    result = sync_with_atomic_operations
  when :distributed_basic
    result = sync_with_read_modify_write
  when :local_only
    result = sync_to_local_cache
  else
    return { success: false, error: "Unknown sync strategy: #{@sync_strategy}" }
  end

  final_result = result.merge(
    duration: Time.current - start_time,
    timestamp: Time.current.iso8601,
    instance_id: @instance_id
  )

  # Coordinate with export system
  coordinate_cache_sync(final_result)

  final_result
rescue StandardError => e
  log_sync_error(e)
  { success: false, error: e.message, timestamp: Time.current.iso8601 }
end