Module: LogStash::PluginMixins::OpenSearch::Common
- Included in:
- Outputs::OpenSearch
- Defined in:
- lib/logstash/plugin_mixins/opensearch/common.rb
Constant Summary collapse
- DOC_DLQ_CODES =
These codes apply to documents, not at the request level
[400, 404]
- DOC_SUCCESS_CODES =
[200, 201]
- DOC_CONFLICT_CODE =
409
Instance Attribute Summary collapse
-
#hosts ⇒ Object
readonly
This module defines common methods that can be reused by alternate opensearch output plugins.
Instance Method Summary collapse
-
#build_client ⇒ HttpClient
Perform some OpenSearch options validations and Build the HttpClient.
- #discover_cluster_uuid ⇒ Object
- #handle_dlq_status(message, action, status, response) ⇒ Object
- #last_version ⇒ Object
- #maximum_seen_major_version ⇒ Object
- #next_sleep_interval(current_interval) ⇒ Object
- #retrying_submit(actions) ⇒ Object
- #setup_hosts ⇒ Object
- #sleep_for_interval(sleep_interval) ⇒ Object
- #stoppable_sleep(interval) ⇒ Object
- #successful_connection? ⇒ Boolean
Instance Attribute Details
#hosts ⇒ Object (readonly)
This module defines common methods that can be reused by alternate opensearch output plugins.
17 18 19 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 17 def hosts @hosts end |
Instance Method Details
#build_client ⇒ HttpClient
Perform some OpenSearch options validations and Build the HttpClient. Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect.
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 27 def build_client # the following 3 options validation & setup methods are called inside build_client # because they must be executed prior to building the client and logstash # monitoring and management rely on directly calling build_client setup_hosts params["metric"] = metric if @proxy.eql?('') @logger.warn "Supplied proxy setting (proxy => '') has no effect" end ::LogStash::Outputs::OpenSearch::HttpClientBuilder.build(@logger, @hosts, params) end |
#discover_cluster_uuid ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 88 def discover_cluster_uuid return unless defined?() return if params && params['auth_type'] && params['auth_type']['service_name'] == "aoss" # AOSS doesn't support GET / cluster_info = client.get('/') .set(:cluster_uuid, cluster_info['cluster_uuid']) rescue => e @logger.error("Unable to retrieve OpenSearch cluster uuid", message: e., exception: e.class, backtrace: e.backtrace) end |
#handle_dlq_status(message, action, status, response) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 139 def handle_dlq_status(, action, status, response) # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior) if @dlq_writer event, action = action.event, [action[0], action[1], action[2]] # TODO: Change this to send a map with { :status => status, :action => action } in the future @dlq_writer.write(event, "#{} status: #{status}, action: #{action}, response: #{response}") else if dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' level = :error else level = :warn end @logger.send level, , status: status, action: action, response: response end end |
#last_version ⇒ Object
61 62 63 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 61 def last_version client.last_version end |
#maximum_seen_major_version ⇒ Object
65 66 67 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 65 def maximum_seen_major_version client.maximum_seen_major_version end |
#next_sleep_interval(current_interval) ⇒ Object
134 135 136 137 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 134 def next_sleep_interval(current_interval) doubled = current_interval * 2 doubled > @retry_max_interval ? @retry_max_interval : doubled end |
#retrying_submit(actions) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 97 def (actions) # Initially we submit the full list of actions submit_actions = actions sleep_interval = @retry_initial_interval while submit_actions && submit_actions.length > 0 # We retry with whatever is didn't succeed begin submit_actions = submit(submit_actions) if submit_actions && submit_actions.size > 0 @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request", count: submit_actions.size) end rescue => e @logger.error("Encountered an unexpected error submitting a bulk request, will retry", message: e., exception: e.class, backtrace: e.backtrace) end # Everything was a success! break if !submit_actions || submit_actions.empty? # If we're retrying the action sleep for the recommended interval # Double the interval for the next time through to achieve exponential backoff sleep_interval = sleep_for_interval(sleep_interval) end end |
#setup_hosts ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 41 def setup_hosts @hosts = Array(@hosts) if @hosts.empty? @logger.info("No 'host' set in opensearch output. Defaulting to localhost") @hosts.replace(["localhost"]) end end |
#sleep_for_interval(sleep_interval) ⇒ Object
125 126 127 128 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 125 def sleep_for_interval(sleep_interval) stoppable_sleep(sleep_interval) next_sleep_interval(sleep_interval) end |
#stoppable_sleep(interval) ⇒ Object
130 131 132 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 130 def stoppable_sleep(interval) Stud.stoppable_sleep(interval) { @stopping.true? } end |
#successful_connection? ⇒ Boolean
69 70 71 |
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 69 def successful_connection? !!maximum_seen_major_version end |