Class: LogStash::Outputs::ElasticSearchHTTP
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/elasticsearch_http.rb
Overview
This output lets you store logs in elasticsearch.
This plugin uses the HTTP/REST interface to ElasticSearch, which usually lets you use any version of elasticsearch server. It is known to work with elasticsearch %ELASTICSEARCH_VERSION%
You can learn more about elasticsearch at <elasticsearch.org>
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
-
#flush(events, teardown = false) ⇒ Object
def receive.
- #get_template_json ⇒ Object
-
#post(body) ⇒ Object
def receive_bulk.
- #receive(event) ⇒ Object
- #register ⇒ Object
-
#teardown ⇒ Object
def post.
- #template_action(command) ⇒ Object
Methods inherited from Base
#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
This class inherits a constructor from LogStash::Outputs::Base
Instance Method Details
#flush(events, teardown = false) ⇒ Object
def receive
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 204 def flush(events, teardown=false) # Avoid creating a new string for newline every time newline = "\n".freeze body = events.collect do |event, index, type| index = event.sprintf(@index) # Set the 'type' value for the index. if @index_type.nil? type = event["type"] || "logs" else type = event.sprintf(@index_type) end header = { "index" => { "_index" => index, "_type" => type } } header["index"]["_id"] = event.sprintf(@document_id) if !@document_id.nil? [ header.to_json, newline, event.to_json, newline ] end.flatten post(body.join("")) end |
#get_template_json ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 174 def get_template_json if @template.nil? if __FILE__ =~ /^(jar:)?file:\/.+!.+/ begin # Running from a jar, assume types.db is at the root. jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!") @template = jar_path rescue => ex raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}" end else if File.exists?("elasticsearch-template.json") @template = "elasticsearch-template.json" elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json") @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json" else raise "You must specify 'template => ...' in your elasticsearch_http output" end end end @template_json = IO.read(@template).gsub(/\n/,'') @logger.info("Using mapping template", :template => @template_json) end |
#post(body) ⇒ Object
def receive_bulk
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 225 def post(body) begin response = @agent.post!(@bulk_url, :body => body) rescue EOFError @logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port) return # abort this flush end # Consume the body for error checking # This will also free up the connection for reuse. body = "" begin response.read_body { |chunk| body += chunk } rescue EOFError @logger.warn("EOF while reading response body from elasticsearch", :host => @host, :port => @port) return # abort this flush end if response.status != 200 @logger.error("Error writing (bulk) to elasticsearch", :response => response, :response_body => body, :request_body => @queue.join("\n")) return end end |
#receive(event) ⇒ Object
199 200 201 202 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 199 def receive(event) return unless output?(event) buffer_receive([event, index, type]) end |
#register ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 96 def register require "ftw" # gem ftw @agent = FTW::Agent.new @queue = [] auth = @user && @password ? "#{@user}:#{@password.value}@" : "" @bulk_url = "http://#{auth}#{@host}:#{@port}/_bulk?replication=#{@replication}" if @manage_template @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) template_search_url = "http://#{auth}#{@host}:#{@port}/_template/*" @template_url = "http://#{auth}#{@host}:#{@port}/_template/#{@template_name}" if @template_overwrite @logger.info("Template overwrite enabled. Deleting existing template.", :template_overwrite => @template_overwrite.to_s) response = @agent.get!(@template_url) template_action('delete') if response.status == 200 #=> Purge the old template if it exists end @logger.debug("Template Search URL:", :template_search_url => template_search_url) has_template = false template_idx_name = @index.sub(/%{[^}]+}/,'*') alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*') # Get the template data response = @agent.get!(template_search_url) json = "" if response.status == 404 #=> This condition can occcur when no template has ever been appended @logger.info("No template found in Elasticsearch...") get_template_json template_action('put') elsif response.status == 200 begin response.read_body { |c| json << c } results = JSON.parse(json) rescue Exception => e @logger.error("Error parsing JSON", :json => json, :results => results.to_s, :error => e.to_s) raise "Exception in parsing JSON", e end if !results.any? { |k,v| v["template"] == template_idx_name || v["template"] == alt_template_idx_name } @logger.debug("No template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name) get_template_json template_action('put') end else #=> Some other status code? @logger.error("Could not check for existing template. Check status code.", :status => response.status.to_s) end # end if response.status == 200 end # end if @manage_template buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) end |
#teardown ⇒ Object
def post
253 254 255 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 253 def teardown buffer_flush(:final => true) end |
#template_action(command) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/logstash/outputs/elasticsearch_http.rb', line 148 def template_action(command) begin if command == 'delete' response = @agent.delete!(@template_url) response.discard_body elsif command == 'put' response = @agent.put!(@template_url, :body => @template_json) response.discard_body end rescue EOFError @logger.warn("EOF while attempting request or reading response header from elasticsearch", :host => @host, :port => @port) return # abort this action end if response.status != 200 @logger.error("Error acting on elasticsearch mapping template", :response => response, :action => command, :request_url => @template_url) return end @logger.info("Successfully deleted template", :template_url => @template_url) if command == 'delete' @logger.info("Successfully applied template", :template_url => @template_url) if command == 'put' end |