Module: LogStash::PluginMixins::OpenSearch::Common

Included in:
Outputs::OpenSearch
Defined in:
lib/logstash/plugin_mixins/opensearch/common.rb

Constant Summary collapse

DOC_DLQ_CODES =

These codes apply to documents, not at the request level

[400, 404]
DOC_SUCCESS_CODES =
[200, 201]
DOC_CONFLICT_CODE =
409

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#hostsObject (readonly)

This module defines common methods that can be reused by alternate opensearch output plugins.



17
18
19
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 17

def hosts
  @hosts
end

Instance Method Details

#build_clientHttpClient

Perform some OpenSearch options validations and Build the HttpClient. Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect.

Returns:

  • (HttpClient)

    the new http client



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 27

def build_client

  # the following 3 options validation & setup methods are called inside build_client
  # because they must be executed prior to building the client and logstash
  # monitoring and management rely on directly calling build_client
  setup_hosts

  params["metric"] = metric
  if @proxy.eql?('')
    @logger.warn "Supplied proxy setting (proxy => '') has no effect"
  end
  ::LogStash::Outputs::OpenSearch::HttpClientBuilder.build(@logger, @hosts, params)
end

#discover_cluster_uuidObject



88
89
90
91
92
93
94
95
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 88

def discover_cluster_uuid
  return unless defined?()
  return if params && params['auth_type'] && params['auth_type']['service_name'] == "aoss" # AOSS doesn't support GET /
  cluster_info = client.get('/')
  .set(:cluster_uuid, cluster_info['cluster_uuid'])
rescue => e
  @logger.error("Unable to retrieve OpenSearch cluster uuid", message: e.message, exception: e.class, backtrace: e.backtrace)
end

#handle_dlq_status(message, action, status, response) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 139

def handle_dlq_status(message, action, status, response)
  # To support bwc, we check if DLQ exists. otherwise we log and drop event (previous behavior)
  if @dlq_writer
    event, action = action.event, [action[0], action[1], action[2]]
    # TODO: Change this to send a map with { :status => status, :action => action } in the future
    @dlq_writer.write(event, "#{message} status: #{status}, action: #{action}, response: #{response}")
  else
    if dig_value(response, 'index', 'error', 'type') == 'invalid_index_name_exception'
      level = :error
    else
      level = :warn
    end
    @logger.send level, message, status: status, action: action, response: response
  end
end

#last_versionObject



61
62
63
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 61

def last_version
  client.last_version
end

#maximum_seen_major_versionObject



65
66
67
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 65

def maximum_seen_major_version
  client.maximum_seen_major_version
end

#next_sleep_interval(current_interval) ⇒ Object



134
135
136
137
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 134

def next_sleep_interval(current_interval)
  doubled = current_interval * 2
  doubled > @retry_max_interval ? @retry_max_interval : doubled
end

#retrying_submit(actions) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 97

def retrying_submit(actions)
  # Initially we submit the full list of actions
  submit_actions = actions

  sleep_interval = @retry_initial_interval

  while submit_actions && submit_actions.length > 0

    # We retry with whatever is didn't succeed
    begin
      submit_actions = submit(submit_actions)
      if submit_actions && submit_actions.size > 0
        @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request", count: submit_actions.size)
      end
    rescue => e
      @logger.error("Encountered an unexpected error submitting a bulk request, will retry",
                    message: e.message, exception: e.class, backtrace: e.backtrace)
    end

    # Everything was a success!
    break if !submit_actions || submit_actions.empty?

    # If we're retrying the action sleep for the recommended interval
    # Double the interval for the next time through to achieve exponential backoff
    sleep_interval = sleep_for_interval(sleep_interval)
  end
end

#setup_hostsObject



41
42
43
44
45
46
47
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 41

def setup_hosts
  @hosts = Array(@hosts)
  if @hosts.empty?
    @logger.info("No 'host' set in opensearch output. Defaulting to localhost")
    @hosts.replace(["localhost"])
  end
end

#sleep_for_interval(sleep_interval) ⇒ Object



125
126
127
128
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 125

def sleep_for_interval(sleep_interval)
  stoppable_sleep(sleep_interval)
  next_sleep_interval(sleep_interval)
end

#stoppable_sleep(interval) ⇒ Object



130
131
132
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 130

def stoppable_sleep(interval)
  Stud.stoppable_sleep(interval) { @stopping.true? }
end

#successful_connection?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/logstash/plugin_mixins/opensearch/common.rb', line 69

def successful_connection?
  !!maximum_seen_major_version
end