52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/logstash/filters/collate.rb', line 52
def filter(event)
@logger.info("do collate filter")
if event == LogStash::SHUTDOWN
@job.trigger()
@job.unschedule()
@logger.info("collate filter thread shutdown.")
return
end
if event["tags"].nil? || !event.tags.include?("collated")
event.cancel
else
return
end
@mutex.synchronize{
@collatingArray.push(event.clone)
if (@collatingArray.length == @count)
collate
end
if (@collatingDone)
while collatedEvent = @collatingArray.pop
collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil?
collatedEvent["tags"] << "collated"
filter_matched(collatedEvent)
yield collatedEvent
end @collatingDone = false
end
}
end
|