Class: Fluent::Plugin::QuotaThrottleFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::QuotaThrottleFilter
- Includes:
- Prometheus, PrometheusLabelParser
- Defined in:
- lib/fluent/plugin/filter_quota_throttle.rb
Overview
QuotaThrottleFilter class is derived from the Filter class and is responsible for filtering records based on quotas
Instance Attribute Summary collapse
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
Configures the plugin.
-
#filter(tag, time, record) ⇒ Object
Filters records based on quotas Params:
tag
: (String) The tag of the recordtime
: (Time) The timestamp of the recordrecord
: (Hash) The record to filter. -
#initialize ⇒ QuotaThrottleFilter
constructor
A new instance of QuotaThrottleFilter.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ QuotaThrottleFilter
Returns a new instance of QuotaThrottleFilter.
28 29 30 31 32 33 |
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 28 def initialize super @reemit_tag_prefix = "secondary" @registry = ::Prometheus::Client.registry @placeholder_expander_builder = Fluent::Plugin::Prometheus.(log) end |
Instance Attribute Details
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
17 18 19 |
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 17 def registry @registry end |
Instance Method Details
#configure(conf) ⇒ Object
Configures the plugin
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 36 def configure(conf) super raise "quota config file should not be empty" \ if @path.nil? or !File.exist?(@path) raise "Warning delay should be non negative" \ if @warning_delay < 0 parsed_config = ConfigParser::Configuration.new(@path) @match_helper = Matcher::MatchHelper.new(parsed_config.quotas, parsed_config.default_quota) if @enable_metrics @base_labels = parse_labels_elements(conf) end end |
#filter(tag, time, record) ⇒ Object
Filters records based on quotas Params:
+tag+: (String) The tag of the record
+time+: (Time) The timestamp of the record
+record+: (Hash) The record to filter
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 77 def filter(tag, time, record) @bucket_store.clean_buckets quota = @match_helper.get_quota(record) group = quota.group_by.map { |key| record.dig(*key) } bucket = @bucket_store.get_bucket(group, quota) labels = {} if @enable_metrics labels = get_labels(record) @metrics[:quota_input].increment(by: 1, labels: labels.merge({quota: quota.name})) end if bucket.allow record else if @enable_metrics @metrics[:quota_exceeded].increment(by: 1, labels: labels.merge({quota: quota.name})) end quota_breached(tag, time, record, bucket, quota) nil end end |
#shutdown ⇒ Object
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 60 def shutdown super if @enable_metrics log.info "Clearing Counters" @metrics.each do |name, metric| @registry.unregister(name) end end log.info "Shutting down" end |
#start ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/filter_quota_throttle.rb', line 49 def start super @bucket_store = RateLimiter::BucketStore.new if @enable_metrics @metrics = { quota_input: get_counter(:fluentd_quota_throttle_input, "Number of records entering quota throttle plugin"), quota_exceeded: get_counter(:fluentd_quota_throttle_exceeded, "Number of records exceeded the quota"), } end end |