Class: Fluent::Plugin::QuotaThrottleFilter

Inherits:
Filter
  • Object
show all
Includes:
Prometheus, PrometheusLabelParser
Defined in:
lib/fluent/plugin/filter_quota_throttle.rb

Overview

QuotaThrottleFilter class is derived from the Filter class and is responsible for filtering records based on quotas

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQuotaThrottleFilter

Returns a new instance of QuotaThrottleFilter.



28
29
30
31
32
33
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 28

def initialize
  super
  @reemit_tag_prefix = "secondary"
  @registry = ::Prometheus::Client.registry
  @placeholder_expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log)
end

Instance Attribute Details

#registryObject (readonly)

Returns the value of attribute registry.



17
18
19
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 17

def registry
  @registry
end

Instance Method Details

#configure(conf) ⇒ Object

Configures the plugin



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 36

def configure(conf)
  super
  raise "quota config file should not be empty" \
    if @path.nil? or !File.exist?(@path)
  raise "Warning delay should be non negative" \
    if @warning_delay < 0
  parsed_config = ConfigParser::Configuration.new(@path)
  @match_helper = Matcher::MatchHelper.new(parsed_config.quotas, parsed_config.default_quota)
  if @enable_metrics
    @base_labels = parse_labels_elements(conf)
  end
end

#filter(tag, time, record) ⇒ Object

Filters records based on quotas Params:

+tag+: (String) The tag of the record
+time+: (Time) The timestamp of the record
+record+: (Hash) The record to filter


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 77

def filter(tag, time, record)
  @bucket_store.clean_buckets
  quota = @match_helper.get_quota(record)
  group = quota.group_by.map { |key| record.dig(*key) }
  bucket = @bucket_store.get_bucket(group, quota)
  labels = {}
  if @enable_metrics
    labels = get_labels(record)
    @metrics[:quota_input].increment(by: 1, labels: labels.merge({quota: quota.name}))
  end
  if bucket.allow
    record
  else
    if @enable_metrics
      @metrics[:quota_exceeded].increment(by: 1, labels: labels.merge({quota: quota.name}))
    end
    quota_breached(tag, time, record, bucket, quota)
    nil
  end
end

#shutdownObject



60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 60

def shutdown
  super
  if @enable_metrics
    log.info "Clearing Counters"
    @metrics.each do |name, metric|
      @registry.unregister(name)
    end
  end
  log.info "Shutting down"
end

#startObject



49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 49

def start
  super
  @bucket_store = RateLimiter::BucketStore.new
  if @enable_metrics
    @metrics = {
       quota_input: get_counter(:fluentd_quota_throttle_input, "Number of records entering quota throttle plugin"),
       quota_exceeded: get_counter(:fluentd_quota_throttle_exceeded, "Number of records exceeded the quota"),
    }
  end
end