Module: Fluent::OpenSearchIndexTemplate

Included in:
Plugin::OpenSearchOutput
Defined in:
lib/fluent/plugin/opensearch_index_template.rb

Instance Method Summary collapse

Instance Method Details

#get_custom_template(template_file, customize_template) ⇒ Object



39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 39

def get_custom_template(template_file, customize_template)
  if !File.exist?(template_file)
    raise "If you specify a template_name you must specify a valid template file (checked '#{template_file}')!"
  end
  file_contents = IO.read(template_file).gsub(/\n/,'')
  customize_template.each do |key, value|
    file_contents = file_contents.gsub(key,value.downcase)
  end
  JSON.parse(file_contents)
end

#get_template(template_file) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 31

def get_template(template_file)
  if !File.exist?(template_file)
    raise "If you specify a template_name you must specify a valid template file (checked '#{template_file}')!"
  end
  file_contents = IO.read(template_file).gsub(/\n/,'')
  JSON.parse(file_contents)
end

#host_unreachable_exceptionsObject



61
62
63
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 61

def host_unreachable_exceptions
  client.transport.transport.host_unreachable_exceptions
end

#indexcreation(index_name, host = nil) ⇒ Object



97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 97

def indexcreation(index_name, host = nil)
  client(host).indices.create(:index => index_name)
rescue OpenSearch::Transport::Transport::Error => e
  if e.message =~ /"already exists"/ || e.message =~ /resource_already_exists_exception/
    log.debug("Index #{index_name} already exists")
  else
    log.error("Error while index creation - #{index_name}", error: e)
  end
end

#retry_operate(max_retries, fail_on_retry_exceed = true, catch_transport_exceptions = true) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 65

def retry_operate(max_retries, fail_on_retry_exceed = true, catch_transport_exceptions = true)
  return unless block_given?
  retries = 0
  transport_errors = OpenSearch::Transport::Transport::Errors.constants.map{ |c| OpenSearch::Transport::Transport::Errors.const_get c } if catch_transport_exceptions
  begin
    yield
  rescue *host_unreachable_exceptions, *transport_errors, Timeout::Error => e
    @_es = nil
    @_es_info = nil
    if retries < max_retries
      retries += 1
      wait_seconds = 2**retries
      sleep wait_seconds
      log.warn "Could not communicate to OpenSearch, resetting connection and trying again. #{e.message}"
      log.warn "Remaining retry: #{max_retries - retries}. Retry to communicate after #{wait_seconds} second(s)."
      retry
    end
    message = "Could not communicate to OpenSearch after #{retries} retries. #{e.message}"
    log.warn message
    raise Fluent::Plugin::OpenSearchError::RetryableOperationExhaustedFailure,
          message if fail_on_retry_exceed
  end
end

#rollover_alias_payload(rollover_alias) ⇒ Object



146
147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 146

def rollover_alias_payload(rollover_alias)
  {
    'aliases' => {
      rollover_alias => {
        'is_write_index' =>  true
      }
    }
  }
end

#template_custom_install(template_name, template_file, overwrite, customize_template, host, target_index, index_separator) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 124

def template_custom_install(template_name, template_file, overwrite, customize_template, host, target_index, index_separator)
  custom_template = get_custom_template(template_file, customize_template)

  if overwrite
    template_put(template_name, custom_template, host)
    log.info("Template '#{template_name}' overwritten with #{template_file}.")
  else
    if !template_exists?(template_name, host)
      template_put(template_name, custom_template, host)
      log.info("Template configured, but no template installed. Installed '#{template_name}' from #{template_file}.")
    else
      log.debug("Template '#{template_name}' configured and already installed.")
    end
  end
end

#template_exists?(name, host = nil) ⇒ Boolean

Returns:

  • (Boolean)


50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 50

def template_exists?(name, host = nil)
  if @use_legacy_template
    client(host).indices.get_template(:name => name)
  else
    client(host).indices.get_index_template(:name => name)
  end
  return true
rescue OpenSearch::Transport::Transport::Errors::NotFound
  return false
end

#template_install(name, template_file, overwrite, host = nil, target_index = nil, index_separator = '-') ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 107

def template_install(name, template_file, overwrite, host = nil, target_index = nil, index_separator = '-')
  if overwrite
    template_put(name,
                 get_template(template_file), host)

    log.debug("Template '#{name}' overwritten with #{template_file}.")
    return
  end
  if !template_exists?(name, host)
    template_put(name,
                 get_template(template_file), host)
    log.info("Template configured, but no template installed. Installed '#{name}' from #{template_file}.")
  else
    log.debug("Template '#{name}' configured and already installed.")
  end
end

#template_put(name, template, host = nil) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 89

def template_put(name, template, host = nil)
  if @use_legacy_template
    client(host).indices.put_template(:name => name, :body => template)
  else
    client(host).indices.put_index_template(:name => name, :body => template)
  end
end

#templates_hash_install(templates, overwrite) ⇒ Object



140
141
142
143
144
# File 'lib/fluent/plugin/opensearch_index_template.rb', line 140

def templates_hash_install(templates, overwrite)
  templates.each do |key, value|
    template_install(key, value, overwrite)
  end
end