13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/logstash/inputs/s3snssqs/log_processor.rb', line 13
def process(record, logstash_event_queue)
file = record[:local_file]
codec = @codec_factory.get_codec(record)
folder = record[:folder]
type = @type_by_folder.fetch(record[:bucket],{})[folder]
metadata = {}
line_count = 0
event_count = 0
file_t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) read_file(file) do |line|
line_count += 1
if stop?
@logger.warn("[#{Thread.current[:name]}] Abort reading in the middle of the file, we will read it again when logstash is started")
throw :skip_delete
end
begin
codec.decode(line) do |event|
event_count += 1
decorate_event(event, metadata, type, record[:key], record[:bucket], record[:s3_data])
logstash_event_queue << event
end
rescue Exception => e
@logger.error("[#{Thread.current[:name]}] Unable to decode line", :line => line, :error => e)
end
end
file_t1 = Process.clock_gettime(Process::CLOCK_MONOTONIC) processing_time = (file_t1 - file_t0)
codec.flush do |event|
event_count += 1
decorate_event(event, metadata, type, record[:key], record[:bucket], record[:s3_data])
@logger.debug("[#{Thread.current[:name]}] Flushing an incomplete event", :event => event.to_s)
logstash_event_queue << event
end
return true
end
|