79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/fluent/plugin/in_wire_protocol_compat.rb', line 79
def on_message(msg, addr)
@parser.parse(msg) do |*parsed|
case parsed[0]
when String
tag, time, record = parsed
records = time.is_a?(Array) ? time : [ [ time, record ] ]
when Integer
tag = @tag
records = [ parsed ]
end
time, record = records.first
unless time && record
log.warn "pattern not match: #{msg.inspect}"
return
end
es = MultiEventStream.new
records.each do |time, record|
record[@source_host_key] = addr[3] if @source_host_key
es.add(time, record)
end
router.emit_stream(tag, es)
end
rescue => e
log.error msg.dump, :error => e, :error_class => e.class, :host => addr[3]
log.error_backtrace
end
|