Class: LogStash::Inputs::ElasticJdbc

Inherits:
Elasticsearch
  • Object
show all
Defined in:
lib/logstash/inputs/elastic_jdbc.rb

Overview

This plugin is a simple extension of Elasticsearch input plugin. We added tracking_column property for search in elasticsearch query all hits that contains the ‘last_update’ value bigger that the value_tracker. The value_tracker contains the last consult to that index stored in a last run file created. We build the query based on the above described. This is a sample of elastic_jdbc plugin statement: input {

# Read all documents from Elasticsearch matching the given query
elastic_jdbc {
  hosts => "localhost"
  tracking_column => "last_update"

last_run_metadata_path => “/opt/logstash/last_run/index_name” }

}

Instance Method Summary collapse

Instance Method Details

#build_queryObject



57
58
59
60
61
62
63
64
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 57

def build_query
  time_now = Time.now.utc
  last_value = @value_tracker ? Time.parse(@value_tracker.value.to_s).iso8601 : Time.parse(time_now).iso8601
  column = @tracking_column.to_s
  query = {query: { range: {column => {gt: last_value.to_s}}}, sort: ["_doc"]}
  @query = query.to_json
  @base_query = LogStash::Json.load(@query)
end

#do_run_slice(output_queue, slice_id = nil) ⇒ Object

def run



70
71
72
73
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 70

def do_run_slice(output_queue, slice_id=nil)
  build_query
  super
end

#push_hit(hit, output_queue) ⇒ Object



75
76
77
78
79
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 75

def push_hit(hit, output_queue)
  super
  @value_tracker.set_value(Time.now.to_s)
  @value_tracker.write
end

#registerObject



44
45
46
47
48
49
50
51
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 44

def register
  if @tracking_column.nil?
    raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
  end
  set_value_tracker(ValueTracking.build_last_value_tracker(self))
  build_query
  super
end

#run(output_queue) ⇒ Object



66
67
68
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 66

def run(output_queue)
  super
end

#set_value_tracker(instance) ⇒ Object

def register



53
54
55
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 53

def set_value_tracker(instance)
  @value_tracker = instance
end

#stopObject



81
82
83
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 81

def stop
  super
end