Class: Fluent::Plugin::SamplingFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_sampling.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/fluent/plugin/filter_sampling.rb', line 14

def configure(conf)
  super

  @counts = {}
  @resets = {} if @minimum_rate_per_min
  @accessor = record_accessor_create(@sample_unit) unless %w(all tag).include?(@sample_unit)
end

#filter(tag, _time, record) ⇒ Object

Access to @counts SHOULD be protected by mutex, with a heavy penalty. Code below is not thread safe, but @counts (counter for sampling rate) is not so serious value (and probably will not be broken…), then i let here as it is now.



27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/filter_sampling.rb', line 27

def filter(tag, _time, record)
  t = record_key(tag, record)
  if @minimum_rate_per_min
    filter_with_minimum_rate(t, record)
  else
    filter_simple(t, record)
  end
end

#filter_simple(t, record) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/filter_sampling.rb', line 36

def filter_simple(t, record)
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  # reset only just before @counts[t] is to be bignum from fixnum
  @counts[t] = 0 if c > 0x6fffffff
  if c % @interval == 0
    record
  else
    nil
  end
end

#filter_with_minimum_rate(t, record) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/filter_sampling.rb', line 47

def filter_with_minimum_rate(t, record)
  @resets[t] ||= Fluent::Clock.now + (60 - rand(30))
  if Fluent::Clock.now > @resets[t]
    @resets[t] = Fluent::Clock.now + 60
    @counts[t] = 0
  end
  c = (@counts[t] = @counts.fetch(t, 0) + 1)
  if c < @minimum_rate_per_min || c % @interval == 0
    record.dup
  else
    nil
  end
end

#record_key(tag, record) ⇒ Object



61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/filter_sampling.rb', line 61

def record_key(tag, record)
  case @sample_unit
  when 'all'
    'all'
  when 'tag'
    tag
  else
    @accessor.call(record)
  end
end