Class: LogStash::Filters::Collate

Inherits:
Base show all
Defined in:
lib/logstash/filters/collate.rb

Overview

Collate events by time or count.

The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.

The config looks like this:

filter {
  collate {
    size => 3000
    interval => "30s"
    order => "ascending"
  }
}

Constant Summary

Constants inherited from Base

Base::RESERVED

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#execute, #initialize, #threadsafe?

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Filters::Base

Instance Method Details

#filter(event) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/filters/collate.rb', line 52

def filter(event)
  @logger.info("do collate filter")
  if event == LogStash::SHUTDOWN
    @job.trigger()
    @job.unschedule()
    @logger.info("collate filter thread shutdown.")
    return
  end

  # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first.
  if event["tags"].nil? || !event.tags.include?("collated")
    event.cancel
  else
    return
  end

  @mutex.synchronize{
    @collatingArray.push(event.clone)

    if (@collatingArray.length == @count)
      collate
    end

    if (@collatingDone)
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil?
        collatedEvent["tags"] << "collated"
        filter_matched(collatedEvent)
        yield collatedEvent
      end # while @collatingArray.pop
      # reset collatingDone flag
      @collatingDone = false
    end
  }
end

#flushObject



100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/logstash/filters/collate.rb', line 100

def flush
  events = []
  if (@collatingDone)
    @mutex.synchronize{
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] << "collated"
        events << collatedEvent
      end # while @collatingArray.pop
    }
    # reset collatingDone flag.
    @collatingDone = false
  end
  return events
end

#registerObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/filters/collate.rb', line 35

def register
  require "thread"
  require "rufus/scheduler"

  @mutex = Mutex.new
  @collatingDone = false
  @collatingArray = Array.new
  @scheduler = Rufus::Scheduler.start_new
  @job = @scheduler.every @interval do
    @logger.info("Scheduler Activated")
    @mutex.synchronize{
      collate
    }
  end
end