39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/fluent/plugin/filter_aggregate.rb', line 39
def configure(conf)
super
require 'dataoperations-aggregate'
@intervals = @interval unless @interval.nil?
@hash_time_format = "%Y-%m-%dT%H"
@interval_seconds = 3600
@intervals[1..-1].each{|interval|
case interval
when 1,5,10,20,30,60,120,180,240,300,600,900,1200,1800,3600
if ! (interval % @intervals[0]) == 0
raise Fluent::ConfigError, "interval must be multiple of default_aggregate_interval(#{@default_aggregate_interval}s)"
end
else
raise Fluent::ConfigError, "interval must set to 1s,5s,10s,20s,30s,1m,5m,10m"
end
}
@group_field_names = @group_fields
@aggregate_field_names = @aggregate_fields
@aggregation_names = @aggregations
@aggregator_name = "#{Socket.gethostname}"
@aggregator_name = "#{@aggregator_name}-#{@aggregator_suffix_name}" unless @aggregator_suffix_name.nil?
@aggregation_names.each {|operation|
if ! VALID_AGGREGATIONS.include?(operation)
raise Fluent::ConfigError, "aggregations must set any combination of sum,min,max,mean,median,variance,standard_deviation,histogram"
end
}
if @aggregation_names.include?("histogram") && (histogram_buckets.empty? || histogram_fields.empty?)
log.warn "histogram aggregation disabled, need histogram_buckets & histogram_fields parameters to work, please review documentation."
elsif @aggregation_names.include?("histogram") && !(histogram_buckets.empty? || histogram_fields.empty?)
log.info "histogram aggregation enabled, bucket count histogram_fields with values (le|ge) histogram_buckets parameter"
end
@aggregator = {}
if load_temporarystatus_file_enabled && ! @temporary_status_file_path.nil? && File.exist?(@temporary_status_file_path) && file_status = File.open(@temporary_status_file_path,'r')
begin
@aggregator=eval(file_status.read)
file_status.close
File.delete(@temporary_status_file_path)
log.info "Temporary information loaded from temporary_status_file_path:#{@temporary_status_file_path} before startup"
rescue Exception => e
log.warn "Failed to load temporary_status_file_path:#{@temporary_status_file_path}"
log.warn e.message
log.warn e.backtrace.inspect
end
end
@aggregator = {} unless @aggregator.is_a?(Hash)
log.warn "temporary_status_file_path is empty, is recomended using to avoid lost statistic information beetween restarts." if @temporary_status_file_path.nil?
@aggregator_mutex = Mutex.new
@processing_mode_type=@processing_mode=='batch' ? :batch : :online
@time_started_mode_type=@time_started_mode=='first_event' ? :fist_event : :last_event
@histogram_bucket_comparation_mode=@histogram_bucket_comparation=='less_or_equal' ? :less_or_equal : :greater_or_equal
@data_operations = DataOperations::Aggregate.new(aggregator: @aggregator,
time_format: @time_format,
time_field: @time_field,
output_time_format: @output_time_format,
intervals: @intervals,
flush_interval: @flush_interval,
keep_interval: @keep_interval,
field_no_data_value: @field_no_data_value,
processing_mode: @processing_mode_type,
time_started_mode: @time_started_mode_type,
log: log,
aggregator_name: @aggregator_name,
aggregation_names: @aggregation_names,
group_field_names: @group_field_names,
aggregate_field_names: @aggregate_field_names,
histogram_buckets: @histogram_buckets,
histogram_fields: @histogram_fields,
histogram_cumulative: @histogram_cumulative,
histogram_bucket_infinite_enabled: @histogram_bucket_infinite_enabled,
histogram_bucket_comparation: @histogram_bucket_comparation_mode
)
end
|