Class: Fluent::Plugin::TimeSamplingFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_time_sampling.rb

Instance Method Summary collapse

Instance Method Details

#cache(key, data = nil) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 56

def cache(key, data = nil)
  if data.nil?
    @cache[key] ||= { :time => Time.at(0) }
    expired = Time.now > @cache[key][:time]
    expired ? nil : @cache[key][:data]
  else
    @cache[key] = {
      :data => data,
      :time => Time.now + @interval
    }
  end
end

#clear_cacheObject



69
70
71
72
73
74
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 69

def clear_cache
  @cache.each_key do |key|
    expired = Time.now > @cache[key][:time]
    @cache.delete(key) if expired
  end
end

#configure(conf) ⇒ Object



12
13
14
15
16
17
18
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 12

def configure(conf)
  super
  @cache = {}
  @cache_clear_lasttime = Time.now
  @cache_clear_interval = @interval * 2
  @interval = -1 if @interval.zero?
end

#filter(tag, time, record) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 28

def filter(tag, time, record)
  cache_key = @unit.map do |unit|
    if unit != "${tag}" && !record.has_key?(unit)
      raise "#{unit}: unit not found"
    end
    unit != "${tag}" ? record[unit] : tag
  end.join(":")

  @keep_keys.each do |key|
    unless record.has_key?(key)
      raise "#{key}: keep_keys not found"
    end
  end

  unless cache(cache_key)
    cache(cache_key, "")
    new_record = @keep_keys.empty? ?
      record.dup : record.select { |k, v| @keep_keys.include?(k) }
  end

  if Time.now > @cache_clear_lasttime + @cache_clear_interval
    clear_cache
    @cache_clear_lasttime = Time.now
  end

  new_record ||= nil
end

#shutdownObject



24
25
26
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 24

def shutdown
  super
end

#startObject



20
21
22
# File 'lib/fluent/plugin/filter_time_sampling.rb', line 20

def start
  super
end