Class: LogStash::Inputs::ElasticJdbc
- Inherits:
-
Elasticsearch
- Object
- Elasticsearch
- LogStash::Inputs::ElasticJdbc
- Defined in:
- lib/logstash/inputs/elastic_jdbc.rb
Overview
This plugin is a simple extension of Elasticsearch input plugin. We added tracking_column property for search in elasticsearch query all hits that contains the ‘last_update’ value bigger that the value_tracker. The value_tracker contains the last consult to that index stored in a last run file created. We build the query based on the above described. This is a sample of elastic_jdbc plugin statement: input {
# Read all documents from Elasticsearch matching the given query
elastic_jdbc {
hosts => "localhost"
tracking_column => "last_update"
last_run_metadata_path => “/opt/logstash/last_run/index_name” }
}
Instance Method Summary collapse
- #build_query ⇒ Object
-
#do_run_slice(output_queue, slice_id = nil) ⇒ Object
def run.
- #push_hit(hit, output_queue) ⇒ Object
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#set_value_tracker(instance) ⇒ Object
def register.
- #stop ⇒ Object
Instance Method Details
#build_query ⇒ Object
57 58 59 60 61 62 63 64 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 57 def build_query time_now = Time.now.utc last_value = @value_tracker ? Time.parse(@value_tracker.value.to_s).iso8601 : Time.parse(time_now).iso8601 column = @tracking_column.to_s query = {query: { range: {column => {gt: last_value.to_s}}}, sort: [{column => {order: "asc"}}]} @query = query.to_json @base_query = LogStash::Json.load(@query) end |
#do_run_slice(output_queue, slice_id = nil) ⇒ Object
def run
70 71 72 73 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 70 def do_run_slice(output_queue, slice_id=nil) build_query super end |
#push_hit(hit, output_queue) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 75 def push_hit(hit, output_queue) event = LogStash::Event.new(hit['_source']) if @docinfo # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} unless docinfo_target.is_a?(Hash) @logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event) # TODO: (colin) I am not sure raising is a good strategy here? raise Exception.new("Elasticsearch input: incompatible event") end @docinfo_fields.each do |field| docinfo_target[field] = hit[field] end event.set(@docinfo_target, docinfo_target) end decorate(event) output_queue << event # Write in the file the last_update value register in the event. @value_tracker.set_value(event.get(@tracking_column)) @value_tracker.write end |
#register ⇒ Object
44 45 46 47 48 49 50 51 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 44 def register if @tracking_column.nil? raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.") end super set_value_tracker(ValueTracking.build_last_value_tracker(self)) build_query end |
#run(output_queue) ⇒ Object
66 67 68 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 66 def run(output_queue) super end |
#set_value_tracker(instance) ⇒ Object
def register
53 54 55 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 53 def set_value_tracker(instance) @value_tracker = instance end |
#stop ⇒ Object
103 104 105 |
# File 'lib/logstash/inputs/elastic_jdbc.rb', line 103 def stop super end |