Class: LogStash::Inputs::ElasticJdbc
- Inherits:
-
Elasticsearch
- Object
- Elasticsearch
- LogStash::Inputs::ElasticJdbc
- Defined in:
- lib/logstash/inputs/elastic_jdbc.rb
Overview
This plugin is a simple extension of Elasticsearch input plugin. We added tracking_column property for search in elasticsearch query all hits that contains the ‘last_update’ value bigger that the value_tracker. The value_tracker contains the last consult to that index stored in a last run file created. We build the query based on the above described. This is a sample of elastic_jdbc plugin statement: input {
# Read all documents from Elasticsearch matching the given query
elastic_jdbc {
hosts => "localhost"
tracking_column => "last_update"
last_run_metadata_path => “/opt/logstash/last_run/index_name” }
}
Instance Method Summary collapse
- #build_query ⇒ Object
-
#do_run_slice(output_queue, slice_id = nil) ⇒ Object
def run.
- #push_hit(hit, output_queue) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#set_value_tracker(instance) ⇒ Object
def register.
- #stop ⇒ Object
Instance Method Details
#build_query ⇒ Object
57 58 59 60 61 62 63 64 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 57 def build_query time_now = Time.now.utc last_value = @value_tracker ? Time.parse(@value_tracker.value.to_s).iso8601 : Time.parse(time_now).iso8601 column = @tracking_column.to_s query = {query: { range: {column => {gt: last_value.to_s}}}, sort: ["_doc"]} @query = query.to_json @base_query = LogStash::Json.load(@query) end |
#do_run_slice(output_queue, slice_id = nil) ⇒ Object
def run
70 71 72 73 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 70 def do_run_slice(output_queue, slice_id=nil) build_query super end |
#push_hit(hit, output_queue) ⇒ Object
75 76 77 78 79 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 75 def push_hit(hit, output_queue) super @value_tracker.set_value(Time.now.to_s) @value_tracker.write end |
#register ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 44 def register if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end set_value_tracker(ValueTracking.build_last_value_tracker(self)) build_query super end |
#run(output_queue) ⇒ Object
66 67 68 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 66 def run(output_queue) super end |
#set_value_tracker(instance) ⇒ Object
def register
53 54 55 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 53 def set_value_tracker(instance) @value_tracker = instance end |
#stop ⇒ Object
81 82 83 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 81 def stop super end |