Class: Fluent::Plugin::ComparisonFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_comparison.rb

Defined Under Namespace

Classes: ConfigError

Constant Summary collapse

DEFAULT_STORAGE_TYPE =
'local'

Instance Method Summary collapse

Constructor Details

#initializeComparisonFilter

Returns a new instance of ComparisonFilter.



23
24
25
26
27
28
# File 'lib/fluent/plugin/filter_comparison.rb', line 23

def initialize
  super
  @column_key = nil
  @column_key_type = nil
  @time_type = nil
end

Instance Method Details

#configure(conf) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/filter_comparison.rb', line 30

def configure(conf)
  super
  @column_key = @comparison_config.column_key
  @column_key_type = @comparison_config.column_key_type
  @time_type = @comparison_config.time_type

  if compare_by_time?
    @time_parser = case @time_type
      when :float then Fluent::NumericTimeParser.new(:float)
      when :unixtime then Fluent::NumericTimeParser.new(:unixtime)
      else
        localtime = @comparison_config.localtime && !@comparison_config.utc
        Fluent::TimeParser.new(@comparison_config.time_format, localtime, @comparison_config.timezone)
    end
  else
  end

  @storage = storage_create(usage: 'comparison', conf: nil, type: DEFAULT_STORAGE_TYPE)
end

#filter(tag, time, record) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/filter_comparison.rb', line 50

def filter(tag, time, record)
  result = nil
  last_recorded = fetch_comparison_key
  begin
    comparison_key = extract_comparison_column_from_record(record)
    if last_recorded.nil? || last_recorded < comparison_key
      set_comparison_key(comparison_key)
      result = record
    end
  rescue => e
    log.warn "failed to filter records", error: e
    log.warn_backtrace
  end
  result
end