Class: Fluent::HistogramOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::HistogramOutput
- Includes:
- Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_histogram.rb
Instance Attribute Summary collapse
-
#flush_interval ⇒ Object
Returns the value of attribute flush_interval.
-
#hists ⇒ Object
Returns the value of attribute hists.
-
#remove_prefix_string ⇒ Object
Returns the value of attribute remove_prefix_string.
-
#zero_hist ⇒ Object
Returns the value of attribute zero_hist.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #flush ⇒ Object
- #flush_emit(now) ⇒ Object
- #generate_output(flushed) ⇒ Object
- #increment(tag, key) ⇒ Object
-
#initialize ⇒ HistogramOutput
constructor
fluentd output plugin’s methods.
-
#initialize_hists(tags = nil) ⇒ Object
Histogram plugin’s method.
- #shutdown ⇒ Object
- #start ⇒ Object
- #tagging(flushed) ⇒ Object
- #watch ⇒ Object
Constructor Details
#initialize ⇒ HistogramOutput
fluentd output plugin’s methods
28 29 30 |
# File 'lib/fluent/plugin/out_histogram.rb', line 28 def initialize super end |
Instance Attribute Details
#flush_interval ⇒ Object
Returns the value of attribute flush_interval.
21 22 23 |
# File 'lib/fluent/plugin/out_histogram.rb', line 21 def flush_interval @flush_interval end |
#hists ⇒ Object
Returns the value of attribute hists.
22 23 24 |
# File 'lib/fluent/plugin/out_histogram.rb', line 22 def hists @hists end |
#remove_prefix_string ⇒ Object
Returns the value of attribute remove_prefix_string.
24 25 26 |
# File 'lib/fluent/plugin/out_histogram.rb', line 24 def remove_prefix_string @remove_prefix_string end |
#zero_hist ⇒ Object
Returns the value of attribute zero_hist.
23 24 25 |
# File 'lib/fluent/plugin/out_histogram.rb', line 23 def zero_hist @zero_hist end |
Instance Method Details
#configure(conf) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_histogram.rb', line 32 def configure(conf) super raise Fluent::ConfigError, "bin_num must be > 0" if @bin_num <= 0 $log.warn %Q[too small "bin_num(=#{@bin_num})" may raise unexpected outcome] if @bin_num < 100 @tag_prefix_string = @tag_prefix + '.' if @tag_prefix @tag_suffix_string = '.' + @tag_suffix if @tag_suffix if @input_tag_remove_prefix @remove_prefix_string = @input_tag_remove_prefix + '.' @remove_prefix_length = @remove_prefix_string.length end @zero_hist = [0] * @bin_num @hists = initialize_hists @sampling_counter = 0 @mutex = Mutex.new end |
#emit(tag, es, chain) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/fluent/plugin/out_histogram.rb', line 101 def emit(tag, es, chain) chain.next es.each do |time, record| keys = record[@count_key] [keys].flatten.each do |k| if @sampling_rate == 1 increment(tag, k) else @sampling_counter += 1 if @sampling_counter >= @sampling_rate increment(tag, k) @sampling_counter = 0 end end end end end |
#flush ⇒ Object
166 167 168 169 |
# File 'lib/fluent/plugin/out_histogram.rb', line 166 def flush flushed, @hists = generate_output(@hists), initialize_hists(@hists.keys.dup) tagging(flushed) end |
#flush_emit(now) ⇒ Object
171 172 173 174 175 176 |
# File 'lib/fluent/plugin/out_histogram.rb', line 171 def flush_emit(now) flushed = flush flushed.each do |tag, data| Fluent::Engine.emit(tag, now, data) end end |
#generate_output(flushed) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/fluent/plugin/out_histogram.rb', line 148 def generate_output(flushed) output = {} flushed.each do |tag, hist| output[tag] = {} sum = hist.inject(:+) avg = sum / hist.size sd = hist.instance_eval do sigmas = map { |n| (avg - n)**2 } Math.sqrt(sigmas.inject(:+) / size) end output[tag][:hist] = hist output[tag][:sum] = sum output[tag][:avg] = avg output[tag][:sd] = sd.to_i end output end |
#increment(tag, key) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_histogram.rb', line 89 def increment(tag, key) @hists[tag] ||= @zero_hist.dup id = key.hash % @bin_num @mutex.synchronize { (0..@alpha).each do |alpha| (-alpha..alpha).each do |a| @hists[tag][(id + a) % @bin_num] += 1 * @sampling_rate end end } end |
#initialize_hists(tags = nil) ⇒ Object
Histogram plugin’s method
79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/out_histogram.rb', line 79 def initialize_hists(=nil) hists = {} if .each do |tag| hists[tag] = @zero_hist.dup end end hists end |
#shutdown ⇒ Object
70 71 72 73 74 |
# File 'lib/fluent/plugin/out_histogram.rb', line 70 def shutdown super @watcher.terminate @watcher.join end |
#start ⇒ Object
53 54 55 56 |
# File 'lib/fluent/plugin/out_histogram.rb', line 53 def start super @watcher = Thread.new(&method(:watch)) end |
#tagging(flushed) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/fluent/plugin/out_histogram.rb', line 120 def tagging(flushed) tagged = {} tagged = Hash[ flushed.map do |tag, hist| if @tag tag = @tag else if @input_tag_remove_prefix && ( ( tag.start_with?(@remove_prefix_string) && tag.length > @remove_prefix_length ) || tag == @input_tag_remove_prefix) tag = tag[@input_tag_remove_prefix.length..-1] tag.gsub!(/^\.|\.$/, "") end if @tag_prefix tag = @tag_prefix_string + tag tag.gsub!(/^\.|\.$/, "") end if @tag_suffix tag += @tag_suffix_string tag.gsub!(/^\.|\.$/, "") end end [tag, hist] end ] tagged end |
#watch ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_histogram.rb', line 58 def watch @last_checked = Fluent::Engine.now while true sleep 0.5 if Fluent::Engine.now - @last_checked >= @flush_interval now = Fluent::Engine.now flush_emit(now) @last_checked = now end end end |