Class: Fluent::HistogramOutput

Inherits:
Output
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/out_histogram.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHistogramOutput

fluentd output plugin’s methods



28
29
30
# File 'lib/fluent/plugin/out_histogram.rb', line 28

def initialize
  super
end

Instance Attribute Details

#flush_intervalObject

Returns the value of attribute flush_interval.



21
22
23
# File 'lib/fluent/plugin/out_histogram.rb', line 21

def flush_interval
  @flush_interval
end

#histsObject

Returns the value of attribute hists.



22
23
24
# File 'lib/fluent/plugin/out_histogram.rb', line 22

def hists
  @hists
end

#remove_prefix_stringObject

Returns the value of attribute remove_prefix_string.



24
25
26
# File 'lib/fluent/plugin/out_histogram.rb', line 24

def remove_prefix_string
  @remove_prefix_string
end

#zero_histObject

Returns the value of attribute zero_hist.



23
24
25
# File 'lib/fluent/plugin/out_histogram.rb', line 23

def zero_hist
  @zero_hist
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/out_histogram.rb', line 32

def configure(conf)
  super

  raise Fluent::ConfigError, "bin_num must be > 0" if @bin_num <= 0
  $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100

  @tag_prefix_string = @tag_prefix + '.' if @tag_prefix
  @tag_suffix_string = '.' + @tag_suffix if @tag_suffix
  if @input_tag_remove_prefix
    @remove_prefix_string = @input_tag_remove_prefix + '.'
    @remove_prefix_length = @remove_prefix_string.length
  end

  @zero_hist = [0] * @bin_num

  @hists = initialize_hists
  @sampling_counter = 0
  @mutex = Mutex.new

end

#emit(tag, es, chain) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/fluent/plugin/out_histogram.rb', line 101

def emit(tag, es, chain)
  chain.next

  es.each do |time, record|
    keys = record[@count_key]
    [keys].flatten.each do |k| 
      if @sampling_rate == 1
        increment(tag, k)
      else
        @sampling_counter += 1
        if @sampling_counter >= @sampling_rate 
          increment(tag, k) 
          @sampling_counter = 0
        end
      end
    end
  end
end

#flushObject



166
167
168
169
# File 'lib/fluent/plugin/out_histogram.rb', line 166

def flush
  flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup)
  tagging(flushed)
end

#flush_emit(now) ⇒ Object



171
172
173
174
175
176
# File 'lib/fluent/plugin/out_histogram.rb', line 171

def flush_emit(now)
  flushed = flush
  flushed.each do |tag, data|
    Fluent::Engine.emit(tag, now, data)
  end
end

#generate_output(flushed) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/plugin/out_histogram.rb', line 148

def generate_output(flushed)
  output = {}
  flushed.each do |tag, hist|
    output[tag] = {}
    sum = hist.inject(:+)
    avg = sum / hist.size
    sd = hist.instance_eval do
      sigmas = map { |n| (avg - n)**2 }
      Math.sqrt(sigmas.inject(:+) / size)
    end
    output[tag][:hist] = hist
    output[tag][:sum] = sum
    output[tag][:avg] = avg
    output[tag][:sd] = sd.to_i
  end
  output
end

#increment(tag, key) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_histogram.rb', line 89

def increment(tag, key)
  @hists[tag] ||= @zero_hist.dup
  id = key.hash % @bin_num
  @mutex.synchronize {
    (0..@alpha).each do |alpha|
      (-alpha..alpha).each do |a|
        @hists[tag][(id + a) % @bin_num] += 1 * @sampling_rate
      end
    end
  }
end

#initialize_hists(tags = nil) ⇒ Object

Histogram plugin’s method



79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_histogram.rb', line 79

def initialize_hists(tags=nil)
  hists = {}
  if tags
    tags.each do |tag|
      hists[tag] = @zero_hist.dup
    end
  end
  hists
end

#shutdownObject



70
71
72
73
74
# File 'lib/fluent/plugin/out_histogram.rb', line 70

def shutdown
  super
  @watcher.terminate
  @watcher.join
end

#startObject



53
54
55
56
# File 'lib/fluent/plugin/out_histogram.rb', line 53

def start
  super
  @watcher = Thread.new(&method(:watch))
end

#tagging(flushed) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/out_histogram.rb', line 120

def tagging(flushed)
  tagged = {}
  tagged = Hash[ flushed.map do |tag, hist|
    if @tag 
      tag = @tag
    else
      if @input_tag_remove_prefix &&
        ( ( tag.start_with?(@remove_prefix_string) && 
           tag.length > @remove_prefix_length ) ||
           tag == @input_tag_remove_prefix)
        tag = tag[@input_tag_remove_prefix.length..-1]
        tag.gsub!(/^\.|\.$/, "")
      end
      if @tag_prefix 
        tag = @tag_prefix_string + tag
        tag.gsub!(/^\.|\.$/, "")
      end
      if @tag_suffix
        tag += @tag_suffix_string
        tag.gsub!(/^\.|\.$/, "")
      end
    end

    [tag, hist]
  end ]
  tagged
end

#watchObject



58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_histogram.rb', line 58

def watch
  @last_checked = Fluent::Engine.now
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= @flush_interval
      now = Fluent::Engine.now
      flush_emit(now)
      @last_checked = now
    end
  end
end