Class: LogStash::Outputs::OpenSearch
- Inherits:
-
Base
- Object
- Base
- LogStash::Outputs::OpenSearch
- Defined in:
- lib/logstash/outputs/opensearch.rb,
lib/logstash/outputs/opensearch/http_client.rb,
lib/logstash/outputs/opensearch/http_client/pool.rb,
lib/logstash/outputs/opensearch/template_manager.rb,
lib/logstash/outputs/opensearch/http_client_builder.rb,
lib/logstash/outputs/opensearch/http_client/manticore_adapter.rb
Overview
This plugin is the recommended method of storing logs in OpenSearch. If you plan on using the OpenSearch Dashboards web interface, you’ll want to use this output.
This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with OpenSearch. We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, yet far easier to administer and work with. When using the HTTP protocol one may upgrade OpenSearch versions without having to upgrade Logstash in lock-step.
You can learn more about OpenSearch at <opensearch.org/>
Retry Policy
This plugin uses the OpenSearch bulk API to optimize its imports into OpenSearch. These requests may experience either partial or total failures.
The following errors are retried infinitely:
-
Network errors (inability to connect)
-
429 (Too many requests) and
-
503 (Service unavailable) errors
NOTE: 409 exceptions are no longer retried. Please set a higher ‘retry_on_conflict` value if you experience 409 exceptions. It is more performant for OpenSearch to retry these exceptions than this plugin.
Batch Sizes ====
This plugin attempts to send batches of events as a single request. However, if a request exceeds 20MB we will break it up until multiple batch requests. If a single document exceeds 20MB it will be sent as a single request.
DNS Caching
This plugin uses the JVM to lookup DNS entries and is subject to the value of docs.oracle.com/javase/7/docs/technotes/guides/net/properties.html[networkaddress.cache.ttl], a global setting for the JVM.
As an example, to set your DNS TTL to 1 second you would set the ‘LS_JAVA_OPTS` environment variable to `-Dnetworkaddress.cache.ttl=1`.
Keep in mind that a connection with keepalive enabled will not reevaluate its DNS value while the keepalive is in effect.
HTTP Compression
This plugin supports request and response compression. Response compression is enabled by default, the user doesn’t have to set any configs in OpenSearch for it to send back compressed response.
For requests compression, users have to enable ‘http_compression` setting in their Logstash config file.
Defined Under Namespace
Modules: HttpClientBuilder Classes: EventActionTuple, HttpClient, TemplateManager
Constant Summary
Constants included from PluginMixins::OpenSearch::APIConfigs
PluginMixins::OpenSearch::APIConfigs::CONFIG_PARAMS, PluginMixins::OpenSearch::APIConfigs::DEFAULT_HOST
Constants included from PluginMixins::OpenSearch::Common
PluginMixins::OpenSearch::Common::DOC_CONFLICT_CODE, PluginMixins::OpenSearch::Common::DOC_DLQ_CODES, PluginMixins::OpenSearch::Common::DOC_SUCCESS_CODES
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#default_index ⇒ Object
readonly
Returns the value of attribute default_index.
-
#default_template_name ⇒ Object
readonly
Returns the value of attribute default_template_name.
Attributes included from PluginMixins::OpenSearch::Common
Instance Method Summary collapse
- #close ⇒ Object
- #config_init(params) ⇒ Object
- #finish_register ⇒ Object
-
#initialize(*params) ⇒ OpenSearch
constructor
A new instance of OpenSearch.
- #map_events(events) ⇒ Object
-
#multi_receive(events) ⇒ Object
Receive an array of events and immediately attempt to index them (no buffering).
- #register ⇒ Object
Methods included from PluginMixins::OpenSearch::APIConfigs
Methods included from PluginMixins::OpenSearch::Common
#build_client, #discover_cluster_uuid, #handle_dlq_status, #last_version, #maximum_seen_major_version, #next_sleep_interval, #retrying_submit, #setup_hosts, #sleep_for_interval, #stoppable_sleep, #successful_connection?
Constructor Details
#initialize(*params) ⇒ OpenSearch
Returns a new instance of OpenSearch.
214 215 216 217 |
# File 'lib/logstash/outputs/opensearch.rb', line 214 def initialize(*params) super end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
210 211 212 |
# File 'lib/logstash/outputs/opensearch.rb', line 210 def client @client end |
#default_index ⇒ Object (readonly)
Returns the value of attribute default_index.
211 212 213 |
# File 'lib/logstash/outputs/opensearch.rb', line 211 def default_index @default_index end |
#default_template_name ⇒ Object (readonly)
Returns the value of attribute default_template_name.
212 213 214 |
# File 'lib/logstash/outputs/opensearch.rb', line 212 def default_template_name @default_template_name end |
Instance Method Details
#close ⇒ Object
300 301 302 303 304 |
# File 'lib/logstash/outputs/opensearch.rb', line 300 def close @stopping.make_true if @stopping stop_after_successful_connection_thread @client.close if @client end |
#config_init(params) ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/logstash/outputs/opensearch.rb', line 259 def config_init(params) proxy = params['proxy'] if proxy.is_a?(String) # environment variables references aren't yet resolved proxy = deep_replace(proxy) if proxy.empty? params.delete('proxy') @proxy = '' else params['proxy'] = proxy # do not do resolving again end end super(params) end |
#finish_register ⇒ Object
252 253 254 255 256 |
# File 'lib/logstash/outputs/opensearch.rb', line 252 def finish_register discover_cluster_uuid install_template super end |
#map_events(events) ⇒ Object
280 281 282 |
# File 'lib/logstash/outputs/opensearch.rb', line 280 def map_events(events) events.map(&@event_mapper) end |
#multi_receive(events) ⇒ Object
Receive an array of events and immediately attempt to index them (no buffering)
275 276 277 278 |
# File 'lib/logstash/outputs/opensearch.rb', line 275 def multi_receive(events) wait_for_successful_connection if @after_successful_connection_done map_events(events) end |
#register ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/logstash/outputs/opensearch.rb', line 219 def register @after_successful_connection_done = Concurrent::AtomicBoolean.new(false) @stopping = Concurrent::AtomicBoolean.new(false) check_action_validity @logger.info("New OpenSearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) @client = build_client @after_successful_connection_thread = after_successful_connection do begin finish_register true # thread.value rescue => e # we do not want to halt the thread with an exception as that has consequences for LS e # thread.value ensure @after_successful_connection_done.make_true end end # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior. @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil @event_mapper = -> (e) { event_action_tuple(e) } @event_target = -> (e) { e.sprintf(@index) } @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) end |