Class: LogStash::FilterWorker
- Includes:
- Stud
- Defined in:
- lib/logstash/filterworker.rb
Overview
TODO(sissel): Should this really be a ‘plugin’ ?
Constant Summary collapse
- Exceptions =
[Exception]
Instance Attribute Summary collapse
-
#after_filter(&block) ⇒ Object
readonly
This block is called after each filter is done on an event.
-
#filters ⇒ Object
Returns the value of attribute filters.
-
#logger ⇒ Object
Returns the value of attribute logger.
Attributes inherited from Plugin
Instance Method Summary collapse
- #filter(original_event) ⇒ Object
-
#flusher ⇒ Object
def run.
-
#initialize(filters, input_queue, output_queue) ⇒ FilterWorker
constructor
A new instance of FilterWorker.
- #run ⇒ Object
-
#teardown ⇒ Object
def flusher.
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
#initialize(filters, input_queue, output_queue) ⇒ FilterWorker
Returns a new instance of FilterWorker.
18 19 20 21 22 23 |
# File 'lib/logstash/filterworker.rb', line 18 def initialize(filters, input_queue, output_queue) @filters = filters @input_queue = input_queue @output_queue = output_queue @shutdown_requested = false end |
Instance Attribute Details
#after_filter(&block) ⇒ Object (readonly)
This block is called after each filter is done on an event. The filtered event and filter class name is passed to the block. This could be used to add metrics in the future?
28 29 30 |
# File 'lib/logstash/filterworker.rb', line 28 def after_filter @after_filter end |
#filters ⇒ Object
Returns the value of attribute filters.
12 13 14 |
# File 'lib/logstash/filterworker.rb', line 12 def filters @filters end |
#logger ⇒ Object
Returns the value of attribute logger.
11 12 13 |
# File 'lib/logstash/filterworker.rb', line 11 def logger @logger end |
Instance Method Details
#filter(original_event) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/logstash/filterworker.rb', line 82 def filter(original_event) # Make an 'events' array that filters can push onto if they # need to generate additional events based on the current event. # The 'split' filter does this, for example. events = [original_event] events.each do |event| @filters.each do |filter| # Filter can emit multiple events, like the 'split' event, so # give the input queue to dump generated events into. # TODO(sissel): This may require some refactoring later, I am not sure # this is the best approach. The goal is to allow filters to modify # the current event, but if necessary, create new events based on # this event. begin update_watchdog(:event => event, :filter => filter) filter.execute(event) do |newevent| events << newevent end rescue *Exceptions => e @logger.warn("Exception during filter", :event => event, :exception => $!, :backtrace => e.backtrace, :filter => filter) ensure clear_watchdog end if event.cancelled? @logger.debug? and @logger.debug("Event cancelled", :event => event, :filter => filter.class) break end @after_filter.call(event,filter) unless @after_filter.nil? end # @filters.each @logger.debug? and @logger.debug("Event finished filtering", :event => event, :thread => Thread.current[:name]) @output_queue.push(event) unless event.cancelled? end # events.each end |
#flusher ⇒ Object
def run
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/filterworker.rb', line 52 def flusher events = [] @filters.each do |filter| # Filter any events generated so far in this flush. events.each do |event| # TODO(sissel): watchdog on flush filtration? unless event.cancelled? filter.filter(event) @after_filter.call(event,filter) unless @after_filter.nil? end end # TODO(sissel): watchdog on flushes? if filter.respond_to?(:flush) flushed = filter.flush events += flushed if !flushed.nil? && flushed.any? end end events.each do |event| @logger.debug? and @logger.debug("Pushing flushed events", :event => event) @output_queue.push(event) unless event.cancelled? end end |
#run ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/logstash/filterworker.rb', line 32 def run # TODO(sissel): Run a flusher thread for each plugin requesting flushes # > It seems reasonable that you could want a multiline filter to flush # after 5 seconds, but want a metrics filter to flush every 10 or 60. # Set up the periodic flusher thread. @flusher = Thread.new { interval(5) { flusher } } while !@shutdown_requested && event = @input_queue.pop if event == LogStash::SHUTDOWN finished @input_queue << LogStash::SHUTDOWN # for the next filter thread return end filter(event) end # while @input_queue.pop finished end |
#teardown ⇒ Object
def flusher
78 79 80 |
# File 'lib/logstash/filterworker.rb', line 78 def teardown @shutdown_requested = true end |