Class: LogStash::Filters::Collate
- Defined in:
- lib/logstash/filters/collate.rb
Overview
Collate events by time or count.
The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.
The config looks like this:
filter {
collate {
size => 3000
interval => "30s"
order => "ascending"
}
}
Constant Summary
Constants inherited from Base
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
Methods inherited from Base
#execute, #initialize, #threadsafe?
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Filters::Base
Instance Method Details
#filter(event) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/filters/collate.rb', line 52 def filter(event) @logger.info("do collate filter") if event == LogStash::SHUTDOWN @job.trigger() @job.unschedule() @logger.info("collate filter thread shutdown.") return end # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first. if event["tags"].nil? || !event..include?("collated") event.cancel else return end @mutex.synchronize{ @collatingArray.push(event.clone) if (@collatingArray.length == @count) collate end if (@collatingDone) while collatedEvent = @collatingArray.pop collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil? collatedEvent["tags"] << "collated" filter_matched(collatedEvent) yield collatedEvent end # while @collatingArray.pop # reset collatingDone flag @collatingDone = false end } end |
#flush ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/logstash/filters/collate.rb', line 100 def flush events = [] if (@collatingDone) @mutex.synchronize{ while collatedEvent = @collatingArray.pop collatedEvent["tags"] << "collated" events << collatedEvent end # while @collatingArray.pop } # reset collatingDone flag. @collatingDone = false end return events end |
#register ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/logstash/filters/collate.rb', line 35 def register require "thread" require "rufus/scheduler" @mutex = Mutex.new @collatingDone = false @collatingArray = Array.new @scheduler = Rufus::Scheduler.start_new @job = @scheduler.every @interval do @logger.info("Scheduler Activated") @mutex.synchronize{ collate } end end |