Class: LogStash::Filters::Elasticsearch

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/elasticsearch.rb

Overview

Search elasticsearch for a previous log event and copy some fields from it into the current event. Below is a complete example of how this filter might be used. Whenever logstash receives an “end” event, it uses this elasticsearch filter to find the matching “start” event based on some operation identifier. Then it copies the @timestamp field from the “start” event into a new field on the “end” event. Finally, using a combination of the “date” filter and the “ruby” filter, we calculate the time duration in hours between the two events.

if [type] == "end" {
   elasticsearch {
      hosts => ["es-server"]
      query => "type:start AND operation:%{[opid]}"
      fields => ["@timestamp", "started"]
   }

   date {
      match => ["[started]", "ISO8601"]
      target => "[started]"
   }

   ruby {
      code => "event['duration_hrs'] = (event['@timestamp'] - event['started']) / 3600 rescue nil"
   }
}

Constant Summary

Constants inherited from Base

Base::RESERVED

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#execute, #initialize, #threadsafe?

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Filters::Base

Instance Method Details

#filter(event) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/logstash/filters/elasticsearch.rb', line 55

def filter(event)
  return unless filter?(event)

  begin
    query_str = event.sprintf(@query)

    results = @client.search q: query_str, sort: @sort, size: 1

    @fields.each do |old, new|
      event[new] = results['hits']['hits'][0]['_source'][old]
    end

    filter_matched(event)
  rescue => e
    @logger.warn("Failed to query elasticsearch for previous event",
                 :query => query_str, :event => event, :error => e)
  end
end

#registerObject



47
48
49
50
51
52
# File 'lib/logstash/filters/elasticsearch.rb', line 47

def register
  require "elasticsearch"

  @logger.info("New ElasticSearch filter", :hosts => @hosts)
  @client = Elasticsearch::Client.new hosts: @hosts
end