Class: LogStash::Outputs::OpenSearch

Inherits:
Base
  • Object
show all
Includes:
PluginMixins::OpenSearch::APIConfigs, PluginMixins::OpenSearch::Common
Defined in:
lib/logstash/outputs/opensearch.rb,
lib/logstash/outputs/opensearch/http_client.rb,
lib/logstash/outputs/opensearch/http_client/pool.rb,
lib/logstash/outputs/opensearch/template_manager.rb,
lib/logstash/outputs/opensearch/http_client_builder.rb,
lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb

Overview

This plugin is the recommended method of storing logs in OpenSearch. If you plan on using the OpenSearch Dashboards web interface, you’ll want to use this output.

This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with OpenSearch. We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, yet far easier to administer and work with. When using the HTTP protocol one may upgrade OpenSearch versions without having to upgrade Logstash in lock-step.

You can learn more about OpenSearch at <opensearch.org/>

Retry Policy

This plugin uses the OpenSearch bulk API to optimize its imports into OpenSearch. These requests may experience either partial or total failures.

The following errors are retried infinitely:

  • Network errors (inability to connect)

  • 429 (Too many requests) and

  • 503 (Service unavailable) errors

NOTE: 409 exceptions are no longer retried. Please set a higher ‘retry_on_conflict` value if you experience 409 exceptions. It is more performant for OpenSearch to retry these exceptions than this plugin.

Batch Sizes ====

This plugin attempts to send batches of events as a single request. However, if a request exceeds 20MB we will break it up until multiple batch requests. If a single document exceeds 20MB it will be sent as a single request.

DNS Caching

This plugin uses the JVM to lookup DNS entries and is subject to the value of docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html[networkaddress.cache.ttl], a global setting for the JVM.

As an example, to set your DNS TTL to 1 second you would set the ‘LS_JAVA_OPTS` environment variable to `-Dnetworkaddress.cache.ttl=1`.

Keep in mind that a connection with keepalive enabled will not reevaluate its DNS value while the keepalive is in effect.

HTTP Compression

This plugin supports request and response compression. Response compression is enabled by default, the user doesn’t have to set any configs in OpenSearch for it to send back compressed response.

For requests compression, users have to enable ‘http_compression` setting in their Logstash config file.

Defined Under Namespace

Modules: HttpClientBuilder Classes: EventActionTuple, HttpClient, TemplateManager

Constant Summary

Constants included from PluginMixins::OpenSearch::APIConfigs

PluginMixins::OpenSearch::APIConfigs::CONFIG_PARAMS, PluginMixins::OpenSearch::APIConfigs::DEFAULT_HOST

Constants included from PluginMixins::OpenSearch::Common

PluginMixins::OpenSearch::Common::DOC_CONFLICT_CODE, PluginMixins::OpenSearch::Common::DOC_DLQ_CODES, PluginMixins::OpenSearch::Common::DOC_SUCCESS_CODES

Instance Attribute Summary collapse

Attributes included from PluginMixins::OpenSearch::Common

#hosts

Instance Method Summary collapse

Methods included from PluginMixins::OpenSearch::APIConfigs

included

Methods included from PluginMixins::OpenSearch::Common

#build_client, #discover_cluster_uuid, #handle_dlq_status, #last_version, #maximum_seen_major_version, #next_sleep_interval, #retrying_submit, #setup_hosts, #sleep_for_interval, #stoppable_sleep, #successful_connection?

Constructor Details

#initialize(*params) ⇒ OpenSearch

Returns a new instance of OpenSearch.



214
215
216
217
# File 'lib/logstash/outputs/opensearch.rb', line 214

def initialize(*params)
  super
  setup_ecs_compatibility_related_defaults
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



210
211
212
# File 'lib/logstash/outputs/opensearch.rb', line 210

def client
  @client
end

#default_indexObject (readonly)

Returns the value of attribute default_index.



211
212
213
# File 'lib/logstash/outputs/opensearch.rb', line 211

def default_index
  @default_index
end

#default_template_nameObject (readonly)

Returns the value of attribute default_template_name.



212
213
214
# File 'lib/logstash/outputs/opensearch.rb', line 212

def default_template_name
  @default_template_name
end

Instance Method Details

#closeObject



300
301
302
303
304
# File 'lib/logstash/outputs/opensearch.rb', line 300

def close
  @stopping.make_true if @stopping
  stop_after_successful_connection_thread
  @client.close if @client
end

#config_init(params) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/logstash/outputs/opensearch.rb', line 259

def config_init(params)
  proxy = params['proxy']
  if proxy.is_a?(String)
    # environment variables references aren't yet resolved
    proxy = deep_replace(proxy)
    if proxy.empty?
      params.delete('proxy')
      @proxy = ''
    else
      params['proxy'] = proxy # do not do resolving again
    end
  end
  super(params)
end

#finish_registerObject



252
253
254
255
256
# File 'lib/logstash/outputs/opensearch.rb', line 252

def finish_register
  discover_cluster_uuid
  install_template
  super
end

#map_events(events) ⇒ Object



280
281
282
# File 'lib/logstash/outputs/opensearch.rb', line 280

def map_events(events)
  events.map(&@event_mapper)
end

#multi_receive(events) ⇒ Object

Receive an array of events and immediately attempt to index them (no buffering)



275
276
277
278
# File 'lib/logstash/outputs/opensearch.rb', line 275

def multi_receive(events)
  wait_for_successful_connection if @after_successful_connection_done
  retrying_submit map_events(events)
end

#registerObject



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/logstash/outputs/opensearch.rb', line 219

def register
  @after_successful_connection_done = Concurrent::AtomicBoolean.new(false)
  @stopping = Concurrent::AtomicBoolean.new(false)

  check_action_validity

  @logger.info("New OpenSearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))

  @client = build_client

  @after_successful_connection_thread = after_successful_connection do
    begin
      finish_register
      true # thread.value
    rescue => e
      # we do not want to halt the thread with an exception as that has consequences for LS
      e # thread.value
    ensure
      @after_successful_connection_done.make_true
    end
  end

  # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
  @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil

  @event_mapper = -> (e) { event_action_tuple(e) }
  @event_target = -> (e) { e.sprintf(@index) }

  @bulk_request_metrics = metric.namespace(:bulk_requests)
  @document_level_metrics = metric.namespace(:documents)
end