Class: Fluent::Plugin::PrometheusMonitorInput

Inherits:
Input
  • Object
show all
Includes:
PrometheusLabelParser
Defined in:
lib/fluent/plugin/in_prometheus_monitor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PrometheusLabelParser

#parse_labels_elements

Constructor Details

#initializePrometheusMonitorInput

Returns a new instance of PrometheusMonitorInput.



15
16
17
18
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 15

def initialize
  super
  @registry = ::Prometheus::Client.registry
end

Instance Attribute Details

#registryObject (readonly)

Returns the value of attribute registry.



13
14
15
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 13

def registry
  @registry
end

Instance Method Details

#configure(conf) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 24

def configure(conf)
  super
  hostname = Socket.gethostname
  expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log)
  expander = expander_builder.build({ 'hostname' => hostname, 'worker_id' => fluentd_worker_id })
  @base_labels = parse_labels_elements(conf)
  @base_labels.each do |key, value|
    unless value.is_a?(String)
      raise Fluent::ConfigError, "record accessor syntax is not available in prometheus_monitor"
    end
    @base_labels[key] = expander.expand(value)
  end

  if defined?(Fluent::Plugin) && defined?(Fluent::Plugin::MonitorAgentInput)
    # from v0.14.6
    @monitor_agent = Fluent::Plugin::MonitorAgentInput.new
  else
    @monitor_agent = Fluent::MonitorAgentInput.new
  end

end

#labels(plugin_info) ⇒ Object



91
92
93
94
95
96
97
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 91

def labels(plugin_info)
  @base_labels.merge(
    plugin_id: plugin_info["plugin_id"],
    plugin_category: plugin_info["plugin_category"],
    type: plugin_info["type"],
  )
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 20

def multi_workers_ready?
  true
end

#startObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 46

def start
  super

  @buffer_newest_timekey = @registry.gauge(
    :fluentd_status_buffer_newest_timekey,
    'Newest timekey in buffer.')
  @buffer_oldest_timekey = @registry.gauge(
    :fluentd_status_buffer_oldest_timekey,
    'Oldest timekey in buffer.')
  buffer_queue_length = @registry.gauge(
    :fluentd_status_buffer_queue_length,
    'Current buffer queue length.')
  buffer_total_queued_size = @registry.gauge(
    :fluentd_status_buffer_total_bytes,
    'Current total size of queued buffers.')
  retry_counts = @registry.gauge(
    :fluentd_status_retry_count,
    'Current retry counts.')

  @monitor_info = {
    'buffer_queue_length' => buffer_queue_length,
    'buffer_total_queued_size' => buffer_total_queued_size,
    'retry_count' => retry_counts,
  }
  timer_execute(:in_prometheus_monitor, @interval, &method(:update_monitor_info))
end

#update_monitor_infoObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/plugin/in_prometheus_monitor.rb', line 73

def update_monitor_info
  @monitor_agent.plugins_info_all.each do |info|
    label = labels(info)

    @monitor_info.each do |name, metric|
      if info[name]
        metric.set(label, info[name])
      end
    end

    timekeys = info["buffer_timekeys"]
    if timekeys && !timekeys.empty?
      @buffer_newest_timekey.set(label, timekeys.max)
      @buffer_oldest_timekey.set(label, timekeys.min)
    end
  end
end