Class: LogStash::Outputs::ElasticSearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::ElasticSearch
- Includes:
- DataStreamSupport, Ilm, PluginMixins::DeprecationLoggerSupport, PluginMixins::ElasticSearch::APIConfigs, PluginMixins::ElasticSearch::Common, PluginMixins::NormalizeConfigSupport
- Defined in:
- lib/logstash/outputs/elasticsearch.rb,
lib/logstash/outputs/elasticsearch/ilm.rb,
lib/logstash/outputs/elasticsearch/http_client.rb,
lib/logstash/outputs/elasticsearch/license_checker.rb,
lib/logstash/outputs/elasticsearch/http_client/pool.rb,
lib/logstash/outputs/elasticsearch/template_manager.rb,
lib/logstash/outputs/elasticsearch/data_stream_support.rb,
lib/logstash/outputs/elasticsearch/http_client_builder.rb,
lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb
Overview
.Compatibility Note
- NOTE
-
Starting with Elasticsearch 5.3, there’s an refmodules-http.html[HTTP setting] called ‘http.content_type.required`. If this option is set to `true`, and you are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output plugin to version 6.2.5 or higher.
This plugin is the recommended method of storing logs in Elasticsearch. If you plan on using the Kibana web interface, you’ll want to use this output.
This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0. We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having to upgrade Logstash in lock-step.
You can learn more about Elasticsearch at <www.elastic.co/products/elasticsearch>
Template management for Elasticsearch 5.x
Index template for this version (Logstash 5.0) has been changed to reflect Elasticsearch’s mapping changes in version 5.0. Most importantly, the subfield for string multi-fields has changed from ‘.raw` to `.keyword` to match ES default behavior.
** Users installing ES 5.x and LS 5.x ** This change will not affect you and you will continue to use the ES defaults.
** Users upgrading from LS 2.x to LS 5.x with ES 5.x ** LS will not force upgrade the template, if ‘logstash` template already exists. This means you will still use `.raw` for sub-fields coming from 2.x. If you choose to use the new template, you will have to reindex your data after the new template is installed.
Retry Policy
The retry policy has changed significantly in the 2.2.0 release. This plugin uses the Elasticsearch bulk API to optimize its imports into Elasticsearch. These requests may experience either partial or total failures.
The following errors are retried infinitely:
-
Network errors (inability to connect)
-
429 (Too many requests) and
-
503 (Service unavailable) errors
NOTE: 409 exceptions are no longer retried. Please set a higher ‘retry_on_conflict` value if you experience 409 exceptions. It is more performant for Elasticsearch to retry these exceptions than this plugin.
Batch Sizes ====
This plugin attempts to send batches of events as a single request. However, if a request exceeds 20MB we will break it up until multiple batch requests. If a single document exceeds 20MB it will be sent as a single request.
DNS Caching
This plugin uses the JVM to lookup DNS entries and is subject to the value of docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html[networkaddress.cache.ttl], a global setting for the JVM.
As an example, to set your DNS TTL to 1 second you would set the ‘LS_JAVA_OPTS` environment variable to `-Dnetworkaddress.cache.ttl=1`.
Keep in mind that a connection with keepalive enabled will not reevaluate its DNS value while the keepalive is in effect.
HTTP Compression
This plugin supports request and response compression. Response compression is enabled by default and for Elasticsearch versions 5.0 and later, the user doesn’t have to set any configs in Elasticsearch for it to send back compressed response. For versions before 5.0, ‘http.compression` must be set to `true` in Elasticsearch to take advantage of response compression when using this plugin
For requests compression, regardless of the Elasticsearch version, users have to enable ‘http_compression` setting in their Logstash config file.
-
Defined Under Namespace
Modules: DataStreamSupport, HttpClientBuilder, Ilm Classes: EventActionTuple, EventMappingError, FailedEventMapping, HttpClient, IndexInterpolationError, LicenseChecker, MapEventsResult, TemplateManager, UnsupportedActionError
Constant Summary collapse
- TARGET_BULK_BYTES =
This is a constant instead of a config option because there really isn’t a good reason to configure it.
The criteria used are:
-
We need a number that’s less than 100MiB because ES won’t accept bulks larger than that.
-
It must be large enough to amortize the connection constant across multiple requests.
-
It must be small enough that even if multiple threads hit this size we won’t use a lot of heap.
We wound up agreeing that a number greater than 10 MiB and less than 100MiB made sense. We picked one on the lowish side to not use too much heap.
-
20 * 1024 * 1024
- DEFAULT_POLICY =
"logstash-policy"
Constants included from PluginMixins::ElasticSearch::APIConfigs
PluginMixins::ElasticSearch::APIConfigs::CONFIG_PARAMS, PluginMixins::ElasticSearch::APIConfigs::DEFAULT_HOST, PluginMixins::ElasticSearch::APIConfigs::DEFAULT_ZIP_LEVEL
Constants included from Ilm
Constants included from PluginMixins::ElasticSearch::Common
PluginMixins::ElasticSearch::Common::DOC_CONFLICT_CODE, PluginMixins::ElasticSearch::Common::DOC_DLQ_CODES, PluginMixins::ElasticSearch::Common::DOC_SUCCESS_CODES
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#default_ilm_rollover_alias ⇒ Object
readonly
Returns the value of attribute default_ilm_rollover_alias.
-
#default_index ⇒ Object
readonly
Returns the value of attribute default_index.
-
#default_template_name ⇒ Object
readonly
Returns the value of attribute default_template_name.
Attributes included from PluginMixins::ElasticSearch::Common
Instance Method Summary collapse
- #close ⇒ Object
- #config_init(params) ⇒ Object
- #finish_register ⇒ Object
-
#initialize(*params) ⇒ ElasticSearch
constructor
A new instance of ElasticSearch.
- #map_events(events) ⇒ Object
-
#multi_receive(events) ⇒ Object
Receive an array of events and immediately attempt to index them (no buffering).
- #register ⇒ Object
- #setup_mapper_and_target(data_stream_enabled) ⇒ Object
Methods included from DataStreamSupport
#data_stream_config?, included
Methods included from PluginMixins::ElasticSearch::APIConfigs
Methods included from Ilm
Methods included from PluginMixins::ElasticSearch::Common
#alive_urls_count, #build_client, #discover_cluster_uuid, #effectively_ssl?, #fill_hosts_from_cloud_id, #handle_dlq_response, #last_es_version, #maximum_seen_major_version, #next_sleep_interval, #retrying_submit, #serverless?, #setup_hosts, #sleep_for_interval, #stoppable_sleep, #successful_connection?
Constructor Details
#initialize(*params) ⇒ ElasticSearch
Returns a new instance of ElasticSearch.
275 276 277 278 279 280 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 275 def initialize(*params) super setup_ssl_params! setup_compression_level! end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
270 271 272 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 270 def client @client end |
#default_ilm_rollover_alias ⇒ Object (readonly)
Returns the value of attribute default_ilm_rollover_alias.
272 273 274 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 272 def default_ilm_rollover_alias @default_ilm_rollover_alias end |
#default_index ⇒ Object (readonly)
Returns the value of attribute default_index.
271 272 273 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 271 def default_index @default_index end |
#default_template_name ⇒ Object (readonly)
Returns the value of attribute default_template_name.
273 274 275 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 273 def default_template_name @default_template_name end |
Instance Method Details
#close ⇒ Object
461 462 463 464 465 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 461 def close @stopping.make_true if @stopping stop_after_successful_connection_thread @client.close if @client end |
#config_init(params) ⇒ Object
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 377 def config_init(params) proxy = params['proxy'] if proxy.is_a?(String) # environment variables references aren't yet resolved proxy = deep_replace(proxy) if proxy.empty? params.delete('proxy') @proxy = '' else params['proxy'] = proxy # do not do resolving again end end super(params) end |
#finish_register ⇒ Object
368 369 370 371 372 373 374 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 368 def finish_register assert_es_version_supports_data_streams if data_stream_config? discover_cluster_uuid install_template setup_ilm if ilm_in_use? super end |
#map_events(events) ⇒ Object
436 437 438 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 436 def map_events(events) safe_interpolation_map_events(events).successful_events end |
#multi_receive(events) ⇒ Object
Receive an array of events and immediately attempt to index them (no buffering)
394 395 396 397 398 399 400 401 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 394 def multi_receive(events) wait_for_successful_connection if @after_successful_connection_done events_mapped = safe_interpolation_map_events(events) (events_mapped.successful_events) unless events_mapped.event_mapping_errors.empty? handle_event_mapping_errors(events_mapped.event_mapping_errors) end end |
#register ⇒ Object
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 282 def register if !failure_type_logging_whitelist.empty? = "'failure_type_logging_whitelist' is deprecated and in a future version of Elasticsearch " + "output plugin will be removed, please use 'silence_errors_in_log' instead." @deprecation_logger.deprecated @logger.warn @silence_errors_in_log = silence_errors_in_log | failure_type_logging_whitelist end @after_successful_connection_done = Concurrent::AtomicBoolean.new(false) @stopping = Concurrent::AtomicBoolean.new(false) check_action_validity @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) # the license_checking behaviour in the Pool class is externalized in the LogStash::ElasticSearchOutputLicenseChecker # class defined in license_check.rb. This license checking is specific to the elasticsearch output here and passed # to build_client down to the Pool class. @client = build_client(LicenseChecker.new(@logger)) # Avoids race conditions in the @data_stream_config initialization (invoking check_data_stream_config! twice). # It's being concurrently invoked by this register method and by the finish_register on the @after_successful_connection_thread data_stream_enabled = data_stream_config? setup_template_manager_defaults(data_stream_enabled) # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior. @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil @dlq_codes = DOC_DLQ_CODES.to_set if dlq_enabled? check_dlq_custom_codes @dlq_codes.merge(dlq_custom_codes) else raise LogStash::ConfigurationError, "DLQ feature (dlq_custom_codes) is configured while DLQ is not enabled" unless dlq_custom_codes.empty? end setup_mapper_and_target(data_stream_enabled) @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) @shutdown_from_finish_register = Concurrent::AtomicBoolean.new(false) @after_successful_connection_thread = after_successful_connection do begin finish_register true # thread.value rescue LogStash::ConfigurationError, LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e return e if pipeline_shutdown_requested? # retry when 429 @logger.debug("Received a 429 status code during registration. Retrying..") && retry if too_many_requests?(e) # shut down pipeline if execution_context&.agent.respond_to?(:stop_pipeline) details = { message: e., exception: e.class } details[:backtrace] = e.backtrace if @logger.debug? @logger.error("Failed to bootstrap. Pipeline \"#{execution_context.pipeline_id}\" is going to shut down", details) @shutdown_from_finish_register.make_true execution_context.agent.stop_pipeline(execution_context.pipeline_id) end e rescue => e e # thread.value ensure @after_successful_connection_done.make_true end end end |
#setup_mapper_and_target(data_stream_enabled) ⇒ Object
356 357 358 359 360 361 362 363 364 365 |
# File 'lib/logstash/outputs/elasticsearch.rb', line 356 def setup_mapper_and_target(data_stream_enabled) if data_stream_enabled @event_mapper = -> (e) { data_stream_event_action_tuple(e) } @event_target = -> (e) { data_stream_name(e) } @index = "#{data_stream_type}-#{data_stream_dataset}-#{data_stream_namespace}".freeze # default name else @event_mapper = -> (e) { event_action_tuple(e) } @event_target = -> (e) { e.sprintf(@index) } end end |