Module: LogStash::PluginMixins::ElasticSearch::Common

Included in:
Outputs::ElasticSearch
Defined in:
lib/logstash/plugin_mixins/elasticsearch/common.rb

Constant Summary collapse

DOC_DLQ_CODES =

These codes apply to documents, not at the request level

[400, 404]
DOC_SUCCESS_CODES =
[200, 201]
DOC_CONFLICT_CODE =
409

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#hostsObject (readonly)

This module defines common methods that can be reused by alternate elasticsearch output plugins such as the elasticsearch_data_streams output.



8
9
10
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 8

def hosts
  @hosts
end

Instance Method Details

#alive_urls_countObject



152
153
154
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 152

def alive_urls_count
  client.alive_urls_count
end

#build_client(license_checker = nil) ⇒ HttpClient

Perform some ES options validations and Build the HttpClient. Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect.

Parameters:

  • license_checker (#appropriate_license?) (defaults to: nil)

    An optional license checker that will be used by the Pool class.

Returns:

  • (HttpClient)

    the new http client



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 19

def build_client(license_checker = nil)
  params["license_checker"] = license_checker

  # the following 3 options validation & setup methods are called inside build_client
  # because they must be executed prior to building the client and logstash
  # monitoring and management rely on directly calling build_client
  # see https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/934#pullrequestreview-396203307
  fill_hosts_from_cloud_id
  validate_authentication

  setup_hosts

  params['ssl_enabled'] = effectively_ssl? unless params.include?('ssl_enabled')

  # inject the TrustStrategy from CATrustedFingerprintSupport
  if trust_strategy_for_ca_trusted_fingerprint
    params["ssl_trust_strategy"] = trust_strategy_for_ca_trusted_fingerprint
  end

  params["metric"] = metric
  if @proxy.eql?('')
    @logger.warn "Supplied proxy setting (proxy => '') has no effect"
  end
  ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
end

#discover_cluster_uuidObject



177
178
179
180
181
182
183
184
185
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 177

def discover_cluster_uuid
  return unless defined?()
  cluster_info = client.get('/')
  .set(:cluster_uuid, cluster_info['cluster_uuid'])
rescue => e
  details = { message: e.message, exception: e.class, backtrace: e.backtrace }
  details[:body] = e.response_body if e.respond_to?(:response_body)
  @logger.error("Unable to retrieve Elasticsearch cluster uuid", details)
end

#effectively_ssl?Boolean

Returns:

  • (Boolean)


75
76
77
78
79
80
81
82
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 75

def effectively_ssl?
  return @ssl_enabled unless @ssl_enabled.nil?

  hosts = Array(@hosts)
  return false if hosts.nil? || hosts.empty?

  hosts.all? { |host| host && host.scheme == "https" }
end

#fill_hosts_from_cloud_idObject



90
91
92
93
94
95
96
97
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 90

def fill_hosts_from_cloud_id
  return unless @cloud_id

  if @hosts && !hosts_default?(@hosts)
    raise LogStash::ConfigurationError, 'Both cloud_id and hosts specified, please only use one of those.'
  end
  @hosts = parse_host_uri_from_cloud_id(@cloud_id)
end

#handle_dlq_response(message, action, status, response) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 235

def handle_dlq_response(message, action, status, response)
  event, action_params = action.event, [action[0], action[1], action[2]]

  if @dlq_writer
    # TODO: Change this to send a map with { :status => status, :action => action } in the future
    detailed_message = "#{message} status: #{status}, action: #{action_params}, response: #{response}"
    @dlq_writer.write(event, "#{detailed_message}")
  else
    log_level = dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception' ? :error : :warn

    @logger.public_send(log_level, message, status: status, action: action_params, response: response)
  end
end

#last_es_versionObject



140
141
142
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 140

def last_es_version
  client.last_es_version
end

#maximum_seen_major_versionObject



144
145
146
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 144

def maximum_seen_major_version
  client.maximum_seen_major_version
end

#next_sleep_interval(current_interval) ⇒ Object



230
231
232
233
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 230

def next_sleep_interval(current_interval)
  doubled = current_interval * 2
  doubled > @retry_max_interval ? @retry_max_interval : doubled
end

#retrying_submit(actions) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 187

def retrying_submit(actions)
  # Initially we submit the full list of actions
  submit_actions = actions

  sleep_interval = @retry_initial_interval

  while submit_actions && submit_actions.size > 0

    # We retry with whatever is didn't succeed
    begin
      submit_actions = submit(submit_actions)
      if submit_actions && submit_actions.size > 0
        @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request", count: submit_actions.size)
      end
    rescue => e
      if abort_batch_present? && e.instance_of?(org.logstash.execution.AbortedBatchException)
        # if Logstash support abort of a batch and the batch is aborting,
        # bubble up the exception so that the pipeline can handle it
        raise e
      else
        @logger.error("Encountered an unexpected error submitting a bulk request, will retry",
                      message: e.message, exception: e.class, backtrace: e.backtrace)
      end
    end

    # Everything was a success!
    break if !submit_actions || submit_actions.empty?

    # If we're retrying the action sleep for the recommended interval
    # Double the interval for the next time through to achieve exponential backoff
    sleep_interval = sleep_for_interval(sleep_interval)
  end
end

#serverless?Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 148

def serverless?
  client.serverless?
end

#setup_hostsObject



67
68
69
70
71
72
73
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 67

def setup_hosts
  @hosts = Array(@hosts)
  if @hosts.empty?
    @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
    @hosts.replace(["localhost"])
  end
end

#sleep_for_interval(sleep_interval) ⇒ Object



221
222
223
224
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 221

def sleep_for_interval(sleep_interval)
  stoppable_sleep(sleep_interval)
  next_sleep_interval(sleep_interval)
end

#stoppable_sleep(interval) ⇒ Object



226
227
228
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 226

def stoppable_sleep(interval)
  Stud.stoppable_sleep(interval) { @stopping.true? }
end

#successful_connection?Boolean

Returns:

  • (Boolean)


156
157
158
# File 'lib/logstash/plugin_mixins/elasticsearch/common.rb', line 156

def successful_connection?
  !!maximum_seen_major_version && alive_urls_count > 0
end