Class: PgqPrometheus::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/pgq_prometheus/processor.rb

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(labels = {}) ⇒ Processor

Returns a new instance of Processor.



64
65
66
# File 'lib/pgq_prometheus/processor.rb', line 64

def initialize(labels = {})
  @metric_labels = labels || {}
end

Class Attribute Details

.after_collectObject

Returns the value of attribute after_collect.



8
9
10
# File 'lib/pgq_prometheus/processor.rb', line 8

def after_collect
  @after_collect
end

.before_collectObject

Returns the value of attribute before_collect.



8
9
10
# File 'lib/pgq_prometheus/processor.rb', line 8

def before_collect
  @before_collect
end

.loggerObject

Returns the value of attribute logger.



8
9
10
# File 'lib/pgq_prometheus/processor.rb', line 8

def logger
  @logger
end

.on_errorObject

Returns the value of attribute on_error.



8
9
10
# File 'lib/pgq_prometheus/processor.rb', line 8

def on_error
  @on_error
end

.sql_callerObject

Returns the value of attribute sql_caller.



8
9
10
# File 'lib/pgq_prometheus/processor.rb', line 8

def sql_caller
  @sql_caller
end

Class Method Details

.running?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/pgq_prometheus/processor.rb', line 53

def running?
  defined?(@thread) && @thread
end

.start(client: nil, frequency: 30, labels: nil) ⇒ Object

Raises:

  • (ArgumentError)


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pgq_prometheus/processor.rb', line 14

def start(client: nil, frequency: 30, labels: nil)
  raise ArgumentError, "#{name}.sql_caller must be defined" if sql_caller.nil?

  stop

  client ||= PrometheusExporter::Client.default
  metric_labels = labels&.dup || {}
  process_collector = new(metric_labels)

  @thread = Thread.new do
    wrap_thread_loop(name) do
      sql_caller.release_connection
      logger&.info { "Start #{name}" }
      while true
        begin
          before_collect&.call
          metrics = process_collector.collect
          metrics.each do |metric|
            client.send_json metric
          end
          after_collect&.call
        rescue => e
          STDERR.puts "#{self.class} Failed To Collect Stats #{e.class} #{e.message}"
          logger&.error { "#{e.class} #{e.message} #{e.backtrace.join("\n")}" }
          on_error&.call(e)
        end
        sleep frequency
      end
    end
  end

  true
end

.stopObject



48
49
50
51
# File 'lib/pgq_prometheus/processor.rb', line 48

def stop
  @thread&.kill
  @thread = nil
end

.wrap_thread_loop(*tags) ⇒ Object



57
58
59
60
61
# File 'lib/pgq_prometheus/processor.rb', line 57

def wrap_thread_loop(*tags)
  return yield if logger.nil? || !logger.respond_to?(:tagged)

  logger.tagged(*tags) { yield }
end

Instance Method Details

#collectObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/pgq_prometheus/processor.rb', line 68

def collect
  metrics = []
  sql_caller.with_connection do

    sql_caller.queue_info.each do |queue_info|
      queue = queue_info[:queue_name]

      queue_metric_opts.each do |name, opts|
        value = opts[:apply].call(queue_info)
        labels = opts[:labels].merge(queue: queue)
        metrics << format_metric(name, value, labels)
      end

      sql_caller.consumer_info(queue).each do |consumer_info|
        consumer = consumer_info[:consumer_name]

        consumer_metric_opts.each do |name, opts|
          value = opts[:apply].call(consumer_info, queue_info)
          labels = opts[:labels].merge(queue: queue, consumer: consumer)
          metrics << format_metric(name, value, labels)
        end
      end
    end

    custom_metric_opts.each do |name, opts|
      value, labels = opts[:apply].call
      labels = (labels || {}).merge(opts[:labels])
      metrics << format_metric(name, value, labels)
    end
  end

  metrics
end