Class: LogStash::Outputs::ElasticSearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ElasticSearch
- Includes:
- Stud::Buffer
- Defined in:
- lib/logstash/outputs/elasticsearch.rb
Overview
This output lets you store logs in Elasticsearch and is the most recommended output for Logstash. If you plan on using the Kibana web interface, you’ll need to use this output.
*VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later.
If you want to set other Elasticsearch options that are not exposed directly as configuration options, there are two methods:
-
Create an ‘elasticsearch.yml` file in the $PWD of the Logstash process
-
Pass in es.* java properties (‘java -Des.node.foo=` or `ruby -J-Des.node.foo=`)
With the default ‘protocol` setting (“node”), this plugin will join your Elasticsearch cluster as a client node, so it will show up in Elasticsearch’s cluster status.
You can learn more about Elasticsearch at <www.elastic.co/products/elasticsearch>
Operational Notes
If using the default ‘protocol` setting (“node”), your firewalls might need to permit port 9300 in both directions (from Logstash to Elasticsearch, and Elasticsearch to Logstash)
Retry Policy
By default all bulk requests to ES are synchronous. Not all events in the bulk requests always make it successfully. For example, there could be events which are not formatted correctly for the index they are targeting (type mismatch in mapping). So that we minimize loss of events, we have a specific retry policy in place. We retry all events which fail to be reached by Elasticsearch for network related issues. We retry specific events which exhibit errors under a separate policy described below. Events of this nature are ones which experience ES error codes described as retryable errors.
*Retryable Errors:*
-
429, Too Many Requests (RFC6585)
-
503, The server is currently unable to handle the request due to a temporary overloading or maintenance of the server.
*Possibly retryable errors*
You may run into a situation where ES rejects an event because a property does not match the type already defined in the mapping. By default, this error is NOT retryable, and will generate an error in the log. However, you may prefer to send it to ES anyway by renaming the type. This will affect your analytics if you depend on the _type field in any way. However, for some, it is preferably to have mislabelled events, over not having them all together. see @rename_type_on_mismatch
Here are the rules of what is retried when:
-
Block and retry all events in bulk response that experiences transient network exceptions until a successful submission is received by Elasticsearch.
-
Retry subset of sent events which resulted in ES errors of a retryable nature which can be found in RETRYABLE_CODES
-
For events which returned retryable error codes, they will be pushed onto a separate queue for retrying events. events in this queue will be retried a maximum of 5 times by default (configurable through :max_retries). The size of this queue is capped by the value set in :retry_max_items.
-
Events from the retry queue are submitted again either when the queue reaches its max size or when the max interval time is reached, which is set in :retry_max_interval.
-
Events which are not retryable or have reached their max retry count are logged to stderr.
Constant Summary collapse
- RETRYABLE_CODES =
[409, 429, 503]
- SUCCESS_CODES =
[200, 201]
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
Instance Method Summary collapse
- #flush(actions, teardown = false) ⇒ Object
- #get_template ⇒ Object
- #mapping_mismatch?(error_message) ⇒ Boolean
- #receive(event) ⇒ Object
- #register ⇒ Object
- #rename_type(action) ⇒ Object
-
#submit(actions) ⇒ Object
synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the # client libraries which might not be thread safe.
- #teardown ⇒ Object
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
73 74 75 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 73 def client @client end |
Instance Method Details
#flush(actions, teardown = false) ⇒ Object
589 590 591 592 593 594 595 596 597 598 599 600 601 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 589 def flush(actions, teardown = false) begin submit(actions) rescue => e @logger.error "Got error to send bulk of actions: #{e.}" raise e ensure unless @protocol == "node" @logger.debug? and @logger.debug "Shifting current elasticsearch client" shift_client end end end |
#get_template ⇒ Object
499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 499 def get_template if @template.nil? @template = ::File.('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__)) if !File.exists?(@template) raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" end end template_json = IO.read(@template).gsub(/\n/,'') template = LogStash::Json.load(template_json) @logger.info("Using mapping template", :template => template) return template end |
#mapping_mismatch?(error_message) ⇒ Boolean
580 581 582 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 580 def mapping_mismatch?() .include? "MapperParsingException" end |
#receive(event) ⇒ Object
513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 513 def receive(event) return unless output?(event) # block until we have not maxed out our # retry queue. This is applying back-pressure # to slow down the receive-rate @retry_flush_mutex.synchronize { @retry_queue_not_full.wait(@retry_flush_mutex) while @retry_queue.size > @retry_max_items } event['@metadata']['retry_count'] = 0 # Set the 'type' value for the index. type = if @document_type event.sprintf(@document_type) elsif @index_type # deprecated event.sprintf(@index_type) else event["type"] || "logs" end params = { :_id => @document_id ? event.sprintf(@document_id) : nil, :_index => event.sprintf(@index), :_type => type, :_routing => @routing ? event.sprintf(@routing) : nil } params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" buffer_receive([event.sprintf(@action), params, event]) end |
#register ⇒ Object
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 342 def register @submit_mutex = Mutex.new # retry-specific variables @retry_flush_mutex = Mutex.new @retry_teardown_requested = Concurrent::AtomicBoolean.new(false) # needs flushing when interval @retry_queue_needs_flushing = ConditionVariable.new @retry_queue_not_full = ConditionVariable.new @retry_queue = Queue.new client_settings = {} if @protocol.nil? @protocol = LogStash::Environment.jruby? ? "node" : "http" end if @protocol == "http" if @action == "create_unless_exists" raise(LogStash::ConfigurationError, "action => 'create_unless_exists' is not supported under the HTTP protocol"); end client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes @logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path]) end if ["node", "transport"].include?(@protocol) # Node or TransportClient; requires JRuby raise(LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }") unless LogStash::Environment.jruby? client_settings["cluster.name"] = @cluster if @cluster client_settings["network.host"] = @bind_host if @bind_host client_settings["transport.tcp.port"] = @bind_port if @bind_port client_settings["client.transport.sniff"] = @sniffing if @node_name client_settings["node.name"] = @node_name else client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end @@plugins.each do |plugin| name = plugin.name.split('-')[-1] client_settings.merge!(LogStash::Outputs::ElasticSearch.const_get(name.capitalize).create_client_config(self)) end end require "logstash/outputs/elasticsearch/protocol" if @port.nil? @port = case @protocol when "http"; "9200" when "transport", "node"; "9300-9305" end end if @host.nil? && @protocol != "node" # node can use zen discovery @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost") @host = ["localhost"] end client_settings.merge! setup_ssl() client_settings.merge! setup_proxy() = { :protocol => @protocol, :client_settings => client_settings } .merge! setup_basic_auth() # Update API setup = { :upsert => @upsert, :doc_as_upsert => @doc_as_upsert } .merge! if @action == 'update' client_class = case @protocol when "transport" LogStash::Outputs::Elasticsearch::Protocols::TransportClient when "node" LogStash::Outputs::Elasticsearch::Protocols::NodeClient when /http/ LogStash::Outputs::Elasticsearch::Protocols::HTTPClient end if @embedded raise(LogStash::ConfigurationError, "The 'embedded => true' setting is only valid for the elasticsearch output under JRuby. You are running #{RUBY_DESCRIPTION}") unless LogStash::Environment.jruby? @logger.warn("The 'embedded => true' setting is enabled. This is not recommended for production use!!!") # LogStash::Environment.load_elasticsearch_jars! # Default @host with embedded to localhost. This should help avoid # newbies tripping on ubuntu and other distros that have a default # firewall that blocks multicast. @host ||= ["localhost"] # Start Elasticsearch local. start_local_elasticsearch end @client = Array.new if protocol == "node" || @host.nil? # if @protocol is "node" or @host is not set = { :host => @host, :port => @port }.merge() @client = [client_class.new()] else # if @protocol in ["transport","http"] @client = @host.map do |host| (_host,_port) = host.split ":" = { :host => _host, :port => _port || @port }.merge() @logger.info "Create client to elasticsearch server on #{_host}:#{_port}" client_class.new() end # @host.map end if @manage_template for client in @client begin @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) client.template_install(@template_name, get_template, @template_overwrite) break rescue => e @logger.error("Failed to install template: #{e.}") end end # for @client loop end # if @manage_templates @logger.info("New Elasticsearch output", :cluster => @cluster, :host => @host, :port => @port, :embedded => @embedded, :protocol => @protocol) @client_idx = 0 @current_client = @client[@client_idx] buffer_initialize( :max_items => @flush_size, :max_interval => @idle_flush_time, :logger => @logger ) @retry_timer_thread = Thread.new do loop do sleep(@retry_max_interval) @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } end end @retry_thread = Thread.new do while @retry_teardown_requested.false? @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) } retry_flush end end end |
#rename_type(action) ⇒ Object
584 585 586 587 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 584 def rename_type(action) action[1][:_type]= action[1][:_type] + Time.now.to_i.to_s action end |
#submit(actions) ⇒ Object
synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the # client libraries which might not be thread safe. the submit method can be called from both the # Stud::Buffer flush thread and from our own retry thread.
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 550 def submit(actions) es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } @submit_mutex.lock begin bulk_response = @current_client.bulk(es_actions) ensure @submit_mutex.unlock end if bulk_response["errors"] actions_with_responses = actions.zip(bulk_response['statuses'], bulk_response['error_messages']) actions_to_retry = [] actions_with_responses.each do |action, resp_code, | if RETRYABLE_CODES.include?(resp_code) @logger.warn "retrying failed action with response code: #{resp_code}" actions_to_retry << action elsif @rename_type_on_mismatch and resp_code == 400 and mapping_mismatch? @logger.warn "retrying mapping mismatch: #{resp_code}" action[2]["tags"] ||= [] actions_to_retry << rename_type(action) elsif not SUCCESS_CODES.include?(resp_code) @logger.warn "failed action with response of #{resp_code}, dropping action: #{action}" end end retry_push(actions_to_retry) unless actions_to_retry.empty? end end |
#teardown ⇒ Object
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 604 def teardown if @cacert # remove temporary jks store created from the cacert File.delete(@truststore) end @retry_teardown_requested.make_true # First, make sure retry_timer_thread is stopped # to ensure we do not signal a retry based on # the retry interval. Thread.kill(@retry_timer_thread) @retry_timer_thread.join # Signal flushing in the case that #retry_flush is in # the process of waiting for a signal. @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } # Now, #retry_flush is ensured to not be in a state of # waiting and can be safely joined into the main thread # for further final execution of an in-process remaining call. @retry_thread.join # execute any final actions along with a proceeding retry for any # final actions that did not succeed. buffer_flush(:final => true) retry_flush end |