50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/fluent/plugin/filter_filter_split.rb', line 50
def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time, record|
begin
unless record.key?(@split_key)
new_es.add(time, record)
next
end
keyvalues = if @keep_other_key
other_keyvalues(record)
else
remained_keyvalues(record)
end
unless record[@split_key].is_a?(Array)
new_es.add(time, record)
log.warn "failed to split with <#{@split_key}> key because the target field is not Array: <#{record[@split_key]}>."
next
end
record[@split_key].each do |v|
unless keyvalues.empty?
if v.is_a?(Hash)
v.merge!(keyvalues)
else
v = {"#{@split_key}": v}.merge(keyvalues)
end
else
unless v.is_a?(Hash)
v = {"#{@split_key}" => v}
end
end
new_es.add(time, v)
end
rescue => e
router.emit_error_event(tag, time, record, e)
log.warn "failed to split with <#{@split_key} key." , error: e
end
end
new_es
end
|