Class: LogStash::Filters::Collate
- Defined in:
- lib/logstash/filters/collate.rb
Overview
Collate events by time or count.
The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.
The config looks like this:
filter {
collate {
size => 3000
interval => "30s"
order => "ascending"
}
}
Constant Summary
Constants inherited from Base
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
Methods inherited from Base
#execute, #initialize, #threadsafe?
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Filters::Base
Instance Method Details
permalink #filter(event) ⇒ Object
[View source]
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/filters/collate.rb', line 52 def filter(event) @logger.info("do collate filter") if event == LogStash::SHUTDOWN @job.trigger() @job.unschedule() @logger.info("collate filter thread shutdown.") return end # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first. if event["tags"].nil? || !event..include?("collated") event.cancel else return end @mutex.synchronize{ @collatingArray.push(event.clone) if (@collatingArray.length == @count) collate end if (@collatingDone) while collatedEvent = @collatingArray.pop collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil? collatedEvent["tags"] << "collated" filter_matched(collatedEvent) yield collatedEvent end # while @collatingArray.pop # reset collatingDone flag @collatingDone = false end } end |
permalink #flush ⇒ Object
[View source]
100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/logstash/filters/collate.rb', line 100 def flush events = [] if (@collatingDone) @mutex.synchronize{ while collatedEvent = @collatingArray.pop collatedEvent["tags"] << "collated" events << collatedEvent end # while @collatingArray.pop } # reset collatingDone flag. @collatingDone = false end return events end |
permalink #register ⇒ Object
[View source]
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/logstash/filters/collate.rb', line 35 def register require "thread" require "rufus/scheduler" @mutex = Mutex.new @collatingDone = false @collatingArray = Array.new @scheduler = Rufus::Scheduler.start_new @job = @scheduler.every @interval do @logger.info("Scheduler Activated") @mutex.synchronize{ collate } end end |