Class: LogStash::Outputs::ElasticSearch

Inherits:
Base
  • Object
show all
Includes:
DataStreamSupport, Ilm, PluginMixins::DeprecationLoggerSupport, PluginMixins::ElasticSearch::APIConfigs, PluginMixins::ElasticSearch::Common, PluginMixins::NormalizeConfigSupport
Defined in:
lib/logstash/outputs/elasticsearch.rb,
lib/logstash/outputs/elasticsearch/ilm.rb,
lib/logstash/outputs/elasticsearch/http_client.rb,
lib/logstash/outputs/elasticsearch/license_checker.rb,
lib/logstash/outputs/elasticsearch/http_client/pool.rb,
lib/logstash/outputs/elasticsearch/template_manager.rb,
lib/logstash/outputs/elasticsearch/data_stream_support.rb,
lib/logstash/outputs/elasticsearch/http_client_builder.rb,
lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb

Overview

.Compatibility Note

NOTE

Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output plugin to version 6.2.5 or higher.

This plugin is the recommended method of storing logs in Elasticsearch. If you plan on using the Kibana web interface, you’ll want to use this output.

This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0. We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having to upgrade Logstash in lock-step.

You can learn more about Elasticsearch at <www.elastic.co/products/elasticsearch>

Template management for Elasticsearch 5.x

Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch’s mapping changes in version 5.0. Most importantly, the subfield for string multi-fields has changed from ‘.raw` to `.keyword` to match ES default behavior.

** Users installing ES 5.x and LS 5.x ** This change will not affect you and you will continue to use the ES defaults.

** Users upgrading from LS 2.x to LS 5.x with ES 5.x ** LS will not force upgrade the template, if ‘logstash` template already exists. This means you will still use `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after the new template is installed.

Retry Policy

The retry policy has changed significantly in the 2.2.0 release. This plugin uses the Elasticsearch bulk API to optimize its imports into Elasticsearch. These requests may experience either partial or total failures.

The following errors are retried infinitely:

  • Network errors (inability to connect)

  • 429 (Too many requests) and

  • 503 (Service unavailable) errors

NOTE: 409 exceptions are no longer retried. Please set a higher ‘retry_on_conflict` value if you experience 409 exceptions. It is more performant for Elasticsearch to retry these exceptions than this plugin.

Batch Sizes ====

This plugin attempts to send batches of events as a single request. However, if a request exceeds 20MB we will break it up until multiple batch requests. If a single document exceeds 20MB it will be sent as a single request.

DNS Caching

This plugin uses the JVM to lookup DNS entries and is subject to the value of docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html[networkaddress.cache.ttl], a global setting for the JVM.

As an example, to set your DNS TTL to 1 second you would set the ‘LS_JAVA_OPTS` environment variable to `-Dnetworkaddress.cache.ttl=1`.

Keep in mind that a connection with keepalive enabled will not reevaluate its DNS value while the keepalive is in effect.

HTTP Compression

This plugin supports request and response compression. Response compression is enabled by default and for Elasticsearch versions 5.0 and later, the user doesn’t have to set any configs in Elasticsearch for it to send back compressed response. For versions before 5.0, ‘http.compression` must be set to `true` in Elasticsearch to take advantage of response compression when using this plugin

For requests compression, regardless of the Elasticsearch version, users have to enable ‘http_compression` setting in their Logstash config file.

Defined Under Namespace

Modules: DataStreamSupport, HttpClientBuilder, Ilm Classes: EventActionTuple, EventMappingError, FailedEventMapping, HttpClient, IndexInterpolationError, LicenseChecker, MapEventsResult, TemplateManager, UnsupportedActionError

Constant Summary collapse

TARGET_BULK_BYTES =

This is a constant instead of a config option because there really isn’t a good reason to configure it.

The criteria used are:

  1. We need a number that’s less than 100MiB because ES won’t accept bulks larger than that.

  2. It must be large enough to amortize the connection constant across multiple requests.

  3. It must be small enough that even if multiple threads hit this size we won’t use a lot of heap.

We wound up agreeing that a number greater than 10 MiB and less than 100MiB made sense. We picked one on the lowish side to not use too much heap.

20 * 1024 * 1024
DEFAULT_POLICY =
"logstash-policy"

Constants included from PluginMixins::ElasticSearch::APIConfigs

PluginMixins::ElasticSearch::APIConfigs::CONFIG_PARAMS, PluginMixins::ElasticSearch::APIConfigs::DEFAULT_HOST, PluginMixins::ElasticSearch::APIConfigs::DEFAULT_ZIP_LEVEL

Constants included from Ilm

Ilm::ILM_POLICY_PATH

Constants included from PluginMixins::ElasticSearch::Common

PluginMixins::ElasticSearch::Common::DOC_CONFLICT_CODE, PluginMixins::ElasticSearch::Common::DOC_DLQ_CODES, PluginMixins::ElasticSearch::Common::DOC_SUCCESS_CODES

Instance Attribute Summary collapse

Attributes included from PluginMixins::ElasticSearch::Common

#hosts

Instance Method Summary collapse

Methods included from DataStreamSupport

#data_stream_config?, included

Methods included from PluginMixins::ElasticSearch::APIConfigs

included

Methods included from Ilm

#ilm_in_use?, #setup_ilm

Methods included from PluginMixins::ElasticSearch::Common

#alive_urls_count, #build_client, #discover_cluster_uuid, #effectively_ssl?, #fill_hosts_from_cloud_id, #handle_dlq_response, #last_es_version, #maximum_seen_major_version, #next_sleep_interval, #retrying_submit, #serverless?, #setup_hosts, #sleep_for_interval, #stoppable_sleep, #successful_connection?

Constructor Details

#initialize(*params) ⇒ ElasticSearch

Returns a new instance of ElasticSearch.



275
276
277
278
279
280
# File 'lib/logstash/outputs/elasticsearch.rb', line 275

def initialize(*params)
  super
  setup_ecs_compatibility_related_defaults
  setup_ssl_params!
  setup_compression_level!
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



270
271
272
# File 'lib/logstash/outputs/elasticsearch.rb', line 270

def client
  @client
end

#default_ilm_rollover_aliasObject (readonly)

Returns the value of attribute default_ilm_rollover_alias.



272
273
274
# File 'lib/logstash/outputs/elasticsearch.rb', line 272

def default_ilm_rollover_alias
  @default_ilm_rollover_alias
end

#default_indexObject (readonly)

Returns the value of attribute default_index.



271
272
273
# File 'lib/logstash/outputs/elasticsearch.rb', line 271

def default_index
  @default_index
end

#default_template_nameObject (readonly)

Returns the value of attribute default_template_name.



273
274
275
# File 'lib/logstash/outputs/elasticsearch.rb', line 273

def default_template_name
  @default_template_name
end

Instance Method Details

#closeObject



461
462
463
464
465
# File 'lib/logstash/outputs/elasticsearch.rb', line 461

def close
  @stopping.make_true if @stopping
  stop_after_successful_connection_thread
  @client.close if @client
end

#config_init(params) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/logstash/outputs/elasticsearch.rb', line 377

def config_init(params)
  proxy = params['proxy']
  if proxy.is_a?(String)
    # environment variables references aren't yet resolved
    proxy = deep_replace(proxy)
    if proxy.empty?
      params.delete('proxy')
      @proxy = ''
    else
      params['proxy'] = proxy # do not do resolving again
    end
  end

  super(params)
end

#finish_registerObject



368
369
370
371
372
373
374
# File 'lib/logstash/outputs/elasticsearch.rb', line 368

def finish_register
  assert_es_version_supports_data_streams if data_stream_config?
  discover_cluster_uuid
  install_template
  setup_ilm if ilm_in_use?
  super
end

#map_events(events) ⇒ Object



436
437
438
# File 'lib/logstash/outputs/elasticsearch.rb', line 436

def map_events(events)
  safe_interpolation_map_events(events).successful_events
end

#multi_receive(events) ⇒ Object

Receive an array of events and immediately attempt to index them (no buffering)



394
395
396
397
398
399
400
401
# File 'lib/logstash/outputs/elasticsearch.rb', line 394

def multi_receive(events)
  wait_for_successful_connection if @after_successful_connection_done
  events_mapped = safe_interpolation_map_events(events)
  retrying_submit(events_mapped.successful_events)
  unless events_mapped.event_mapping_errors.empty?
    handle_event_mapping_errors(events_mapped.event_mapping_errors)
  end
end

#registerObject



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/logstash/outputs/elasticsearch.rb', line 282

def register
  if !failure_type_logging_whitelist.empty?
    log_message = "'failure_type_logging_whitelist' is deprecated and in a future version of Elasticsearch " +
      "output plugin will be removed, please use 'silence_errors_in_log' instead."
    @deprecation_logger.deprecated log_message
    @logger.warn log_message
    @silence_errors_in_log = silence_errors_in_log | failure_type_logging_whitelist
  end

  @after_successful_connection_done = Concurrent::AtomicBoolean.new(false)
  @stopping = Concurrent::AtomicBoolean.new(false)

  check_action_validity

  @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))

  # the license_checking behaviour in the Pool class is externalized in the LogStash::ElasticSearchOutputLicenseChecker
  # class defined in license_check.rb. This license checking is specific to the elasticsearch output here and passed
  # to build_client down to the Pool class.
  @client = build_client(LicenseChecker.new(@logger))

  # Avoids race conditions in the @data_stream_config initialization (invoking check_data_stream_config! twice).
  # It's being concurrently invoked by this register method and by the finish_register on the @after_successful_connection_thread
  data_stream_enabled = data_stream_config?

  setup_template_manager_defaults(data_stream_enabled)
  # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
  @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil

  @dlq_codes = DOC_DLQ_CODES.to_set

  if dlq_enabled?
    check_dlq_custom_codes
    @dlq_codes.merge(dlq_custom_codes)
  else
    raise LogStash::ConfigurationError, "DLQ feature (dlq_custom_codes) is configured while DLQ is not enabled" unless dlq_custom_codes.empty?
  end

  setup_mapper_and_target(data_stream_enabled)

  @bulk_request_metrics = metric.namespace(:bulk_requests)
  @document_level_metrics = metric.namespace(:documents)

  @shutdown_from_finish_register = Concurrent::AtomicBoolean.new(false)
  @after_successful_connection_thread = after_successful_connection do
    begin
      finish_register
      true # thread.value
    rescue LogStash::ConfigurationError, LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
      return e if pipeline_shutdown_requested?

      # retry when 429
      @logger.debug("Received a 429 status code during registration. Retrying..") && retry if too_many_requests?(e)

      # shut down pipeline
      if execution_context&.agent.respond_to?(:stop_pipeline)
        details = { message: e.message, exception: e.class }
        details[:backtrace] = e.backtrace if @logger.debug?
        @logger.error("Failed to bootstrap. Pipeline \"#{execution_context.pipeline_id}\" is going to shut down", details)

        @shutdown_from_finish_register.make_true
        execution_context.agent.stop_pipeline(execution_context.pipeline_id)
      end

      e
    rescue => e
      e # thread.value
    ensure
      @after_successful_connection_done.make_true
    end
  end

end

#setup_mapper_and_target(data_stream_enabled) ⇒ Object



356
357
358
359
360
361
362
363
364
365
# File 'lib/logstash/outputs/elasticsearch.rb', line 356

def setup_mapper_and_target(data_stream_enabled)
  if data_stream_enabled
    @event_mapper = -> (e) { data_stream_event_action_tuple(e) }
    @event_target = -> (e) { data_stream_name(e) }
    @index = "#{data_stream_type}-#{data_stream_dataset}-#{data_stream_namespace}".freeze # default name
  else
    @event_mapper = -> (e) { event_action_tuple(e) }
    @event_target = -> (e) { e.sprintf(@index) }
  end
end