Class: LogStash::Outputs::ElasticSearch

Inherits:
Base
  • Object
show all
Includes:
Stud::Buffer
Defined in:
lib/logstash/outputs/elasticsearch.rb

Overview

This output lets you store logs in elasticsearch and is the most recommended output for logstash. If you plan on using the logstash web interface, you’ll need to use this output.

*VERSION NOTE*: Your elasticsearch cluster must be running elasticsearch
%ELASTICSEARCH_VERSION%. If you use any other version of elasticsearch,
you should consider using the [elasticsearch_http](elasticsearch_http)
output instead.

If you want to set other elasticsearch options that are not exposed directly as config options, there are two options:

  • create an elasticsearch.yml file in the $PWD of the logstash process

  • pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)

This plugin will join your elasticsearch cluster, so it will show up in elasticsearch’s cluster health status.

You can learn more about elasticsearch at <elasticsearch.org>

## Operational Notes

Template management is a new feature and requires at least version Elasticsearch 0.90.5+

If you are still using a version older than this, please upgrade for more benefits than just template management.

Your firewalls will need to permit port 9300 in both directions (from logstash to elasticsearch, and elasticsearch to logstash)

Constant Summary

Constants included from Config::Mixin

Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#handle, #handle_worker, #initialize, #worker_setup, #workers_not_supported

Methods included from Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Outputs::Base

Instance Method Details

#flush(events, teardown = false) ⇒ Object

def receive



308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/logstash/outputs/elasticsearch.rb', line 308

def flush(events, teardown=false)
  request = @client.bulk
  events.each do |event, index, type|
    index = event.sprintf(@index)

    # Set the 'type' value for the index.
    if @index_type.nil?
      type =  event["type"] || "logs"
    else
      type = event.sprintf(@index_type)
    end
    if @document_id
      request.index(index, type, event.sprintf(@document_id), event.to_json)
    else
      request.index(index, type, nil, event.to_json)
    end
  end

  request.execute!
  # TODO(sissel): Handle errors. Since bulk requests could mostly succeed
  # (aka partially fail), we need to figure out what documents need to be
  # retried.
end

#get_template_jsonObject



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/logstash/outputs/elasticsearch.rb', line 264

def get_template_json
  if @template.nil?
    if __FILE__ =~ /^(jar:)?file:\/.+!.+/
      begin
        # Running from a jar, assume types.db is at the root.
        jar_path = [__FILE__.split("!").first, "/elasticsearch-template.json"].join("!")
        @template = jar_path
      rescue => ex
        raise "Failed to cache, due to: #{ex}\n#{ex.backtrace}"
      end
    else
      if File.exists?("elasticsearch-template.json")
        @template = "elasticsearch-template.json"
      elsif File.exists?("lib/logstash/outputs/elasticsearch/elasticsearch-template.json")
        @template = "lib/logstash/outputs/elasticsearch/elasticsearch-template.json"
      else
        raise "You must specify 'template => ...' in your elasticsearch_http output"
      end
    end
  end
  @template_json = IO.read(@template).gsub(/\n/,'')
  @logger.info("Using mapping template", :template => @template_json)
end

#receive(event) ⇒ Object



303
304
305
306
# File 'lib/logstash/outputs/elasticsearch.rb', line 303

def receive(event)
  return unless output?(event)
  buffer_receive([event, index, type])
end

#registerObject



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/logstash/outputs/elasticsearch.rb', line 154

def register
  # TODO(sissel): find a better way of declaring where the elasticsearch
  # libraries are
  # TODO(sissel): can skip this step if we're running from a jar.
  jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/elasticsearch*/lib/*.jar")
  Dir[jarpath].each do |jar|
    require jar
  end

  # setup log4j properties for elasticsearch
  LogStash::Logger.setup_log4j(@logger)

  if @embedded
    # Default @host with embedded to localhost. This should help avoid
    # newbies tripping on ubuntu and other distros that have a default
    # firewall that blocks multicast.
    @host ||= "localhost"

    # Start elasticsearch local.
    start_local_elasticsearch
  end
  require "jruby-elasticsearch"

  @logger.info("New ElasticSearch output", :cluster => @cluster,
               :host => @host, :port => @port, :embedded => @embedded)
  options = {
    :cluster => @cluster,
    :host => @host,
    :port => @port,
    :bind_host => @bind_host,
    :node_name => @node_name,
  }

  # :node or :transport protocols
  options[:type] = @protocol.to_sym 

  options[:bind_port] = @bind_port unless @bind_port.nil?

  # TransportClient requires a number for a port.
  options[:port] = options[:port].to_i if options[:type] == :transport

  @client = ElasticSearch::Client.new(options)

  # Check to see if we *can* get the template
  java_client = @client.instance_eval{@client}
  begin
    check_template = ElasticSearch::GetIndexTemplatesRequest.new(java_client, @template_name)
    result = check_template.execute #=> Run previously...
  rescue Exception => e
    @logger.error("Unable to check template.  Automatic template management disabled.", :error => e.to_s)
    @manage_template = false
  end
  
  if @manage_template
    @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
    if @template_overwrite
      @logger.info("Template overwrite enabled.  Deleting template if it exists.", :template_overwrite => @template_overwrite.to_s)
      if !result.getIndexTemplates.isEmpty
        delete_template = ElasticSearch::DeleteIndexTemplateRequest.new(java_client, @template_name)
        result = delete_template.execute
        if result.isAcknowledged
          @logger.info("Successfully deleted template", :template_name => @template_name)
        else
          @logger.error("Failed to delete template", :template_name => @template_name)
        end
      end  
    end # end if @template_overwrite
    has_template = false
    @logger.debug("Fetching all templates...")
    gettemplates = ElasticSearch::GetIndexTemplatesRequest.new(java_client, "*")
    result = gettemplates.execute
    # Results of this come as a list, so we need to iterate through it
    if !result.getIndexTemplates.isEmpty
       = result.getIndexTemplates
      templates = {}
      i = 0
      .size.times do
        template_data = .get(i)
        templates[template_data.name] = template_data.template
        i += 1
      end
      template_idx_name = @index.sub(/%{[^}]+}/,'*')
      alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*')
      if !templates.any? { |k,v| v == template_idx_name || v == alt_template_idx_name }
        @logger.debug("No logstash template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name)
      else
        has_template = true
        @logger.info("Found existing Logstash template match.", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name, :templates => templates.to_s)
      end
    end
    if !has_template #=> No template found, we're going to add one
      get_template_json
      put_template = ElasticSearch::PutIndexTemplateRequest.new(java_client, @template_name, @template_json)
      result = put_template.execute
      if result.isAcknowledged
        @logger.info("Successfully inserted template", :template_name => @template_name)
      else
        @logger.error("Failed to insert template", :template_name => @template_name)
      end
    end 
  end # if @manage_templates  
  
  buffer_initialize(
    :max_items => @flush_size,
    :max_interval => @idle_flush_time,
    :logger => @logger
  )
end