Class: Fluent::Plugin::SplitBySizeFilter
- Inherits:
-
Filter
- Object
- Filter
- Fluent::Plugin::SplitBySizeFilter
show all
- Defined in:
- lib/fluent/plugin/filter_split_by_size.rb
Defined Under Namespace
Classes: KeyNotFoundError, SkipRecordError
Instance Method Summary
collapse
Instance Method Details
50
51
52
53
|
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 50
def configure(conf)
super
@max_record_size = @max_event_size - 200 end
|
#filter_stream(tag, es) ⇒ Object
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 55
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each { |time, record|
begin
raise KeyNotFoundError.new(@id_field, record) if record[@id_field].nil?
id = record.delete(@id_field)
records = split_event(record)
records.each { |rec|
rec[@id_field] = id
new_es.add(time, rec)
}
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
new_es
end
|
#size_of_values(record) ⇒ Object
73
74
75
|
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 73
def size_of_values(record)
Yajl.dump(record).bytesize
end
|
#split_event(record) ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
# File 'lib/fluent/plugin/filter_split_by_size.rb', line 77
def split_event(record)
records = []
if size_of_values(record) > @max_record_size
if record.keys.count > 1
split_records = record.split_into(2)
split_records.each { |rec|
records = records + split_event(rec)
}
else
log.warn('Key/Value pair is too large: "%s:%s". Max size is: %s, dropping!' % [record.keys[0], record.values[0], @max_record_size])
return []
end
else
records.push(record)
end
records
end
|