Class: LogStash::FilterWorker

Inherits:
Plugin
  • Object
show all
Includes:
Stud
Defined in:
lib/logstash/filterworker.rb

Overview

TODO(sissel): Should this really be a ‘plugin’ ?

Constant Summary collapse

Exceptions =
[Exception]

Instance Attribute Summary collapse

Attributes inherited from Plugin

#params

Instance Method Summary collapse

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s

Constructor Details

#initialize(filters, input_queue, output_queue) ⇒ FilterWorker

Returns a new instance of FilterWorker.



18
19
20
21
22
23
# File 'lib/logstash/filterworker.rb', line 18

def initialize(filters, input_queue, output_queue)
  @filters = filters
  @input_queue = input_queue
  @output_queue = output_queue
  @shutdown_requested = false
end

Instance Attribute Details

#after_filter(&block) ⇒ Object (readonly)

This block is called after each filter is done on an event. The filtered event and filter class name is passed to the block. This could be used to add metrics in the future?



28
29
30
# File 'lib/logstash/filterworker.rb', line 28

def after_filter
  @after_filter
end

#filtersObject

Returns the value of attribute filters.



12
13
14
# File 'lib/logstash/filterworker.rb', line 12

def filters
  @filters
end

#loggerObject

Returns the value of attribute logger.



11
12
13
# File 'lib/logstash/filterworker.rb', line 11

def logger
  @logger
end

Instance Method Details

#filter(original_event) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/logstash/filterworker.rb', line 82

def filter(original_event)
  # Make an 'events' array that filters can push onto if they
  # need to generate additional events based on the current event.
  # The 'split' filter does this, for example.
  events = [original_event]

  events.each do |event|
    @filters.each do |filter|
      # Filter can emit multiple events, like the 'split' event, so
      # give the input queue to dump generated events into.

      # TODO(sissel): This may require some refactoring later, I am not sure
      # this is the best approach. The goal is to allow filters to modify
      # the current event, but if necessary, create new events based on
      # this event.
      begin
        update_watchdog(:event => event, :filter => filter)
        filter.execute(event) do |newevent|
          events << newevent
        end
      rescue *Exceptions => e
        @logger.warn("Exception during filter", :event => event,
                     :exception => $!, :backtrace => e.backtrace,
                     :filter => filter)
      ensure
        clear_watchdog
      end
      if event.cancelled?
        @logger.debug? and @logger.debug("Event cancelled", :event => event,
                                         :filter => filter.class)
        break
      end
      @after_filter.call(event,filter) unless @after_filter.nil?
    end # @filters.each

    @logger.debug? and @logger.debug("Event finished filtering", :event => event,
                                     :thread => Thread.current[:name])
    @output_queue.push(event) unless event.cancelled?
  end # events.each
end

#flusherObject

def run



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/filterworker.rb', line 52

def flusher
  events = []
  @filters.each do |filter|

    # Filter any events generated so far in this flush.
    events.each do |event|
      # TODO(sissel): watchdog on flush filtration?
      unless event.cancelled?
        filter.filter(event)
        @after_filter.call(event,filter) unless @after_filter.nil?
      end
    end

    # TODO(sissel): watchdog on flushes?
    if filter.respond_to?(:flush)
      flushed = filter.flush 
      events += flushed if !flushed.nil? && flushed.any?
    end
  end

  events.each do |event|
    @logger.debug? and @logger.debug("Pushing flushed events", :event => event)
    @output_queue.push(event) unless event.cancelled?
  end
end

#runObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/logstash/filterworker.rb', line 32

def run
  # TODO(sissel): Run a flusher thread for each plugin requesting flushes
  # > It seems reasonable that you could want a multiline filter to flush
  #   after 5 seconds, but want a metrics filter to flush every 10 or 60.

  # Set up the periodic flusher thread.
  @flusher = Thread.new { interval(5) { flusher } }

  while !@shutdown_requested && event = @input_queue.pop
    if event == LogStash::SHUTDOWN
      finished
      @input_queue << LogStash::SHUTDOWN # for the next filter thread
      return
    end

    filter(event)
  end # while @input_queue.pop
  finished
end

#teardownObject

def flusher



78
79
80
# File 'lib/logstash/filterworker.rb', line 78

def teardown
  @shutdown_requested = true
end