Class: LogStash::Filters::Collate

Inherits:
Base show all
Defined in:
lib/logstash/filters/collate.rb

Overview

Collate events by time or count.

The original goal of this filter was to merge the logs from different sources by the time of log, for example, in real-time log collection, logs can be collated by amount of 3000 logs or can be collated in 30 seconds.

The config looks like this:

filter {
  collate {
    size => 3000
    interval => "30s"
    order => "ascending"
  }
}

Constant Summary

Constants inherited from Base

Base::RESERVED

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#execute, #initialize, #threadsafe?

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Filters::Base

Instance Method Details

#filter(event) ⇒ Object

[View source]

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/filters/collate.rb', line 52

def filter(event)
  @logger.info("do collate filter")
  if event == LogStash::SHUTDOWN
    @job.trigger()
    @job.unschedule()
    @logger.info("collate filter thread shutdown.")
    return
  end

  # if the event is collated, a "collated" tag will be marked, so for those uncollated event, cancel them first.
  if event["tags"].nil? || !event.tags.include?("collated")
    event.cancel
  else
    return
  end

  @mutex.synchronize{
    @collatingArray.push(event.clone)

    if (@collatingArray.length == @count)
      collate
    end

    if (@collatingDone)
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] = Array.new if collatedEvent["tags"].nil?
        collatedEvent["tags"] << "collated"
        filter_matched(collatedEvent)
        yield collatedEvent
      end # while @collatingArray.pop
      # reset collatingDone flag
      @collatingDone = false
    end
  }
end

#flushObject

[View source]

100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/logstash/filters/collate.rb', line 100

def flush
  events = []
  if (@collatingDone)
    @mutex.synchronize{
      while collatedEvent = @collatingArray.pop
        collatedEvent["tags"] << "collated"
        events << collatedEvent
      end # while @collatingArray.pop
    }
    # reset collatingDone flag.
    @collatingDone = false
  end
  return events
end

#registerObject

[View source]

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/filters/collate.rb', line 35

def register
  require "thread"
  require "rufus/scheduler"

  @mutex = Mutex.new
  @collatingDone = false
  @collatingArray = Array.new
  @scheduler = Rufus::Scheduler.start_new
  @job = @scheduler.every @interval do
    @logger.info("Scheduler Activated")
    @mutex.synchronize{
      collate
    }
  end
end