Module: Fluent::Plugin::ElasticsearchIndexLifecycleManagement

Included in:
ElasticsearchOutput
Defined in:
lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb

Constant Summary collapse

ILM_DEFAULT_POLICY_PATH =
"default-ilm-policy.json"

Instance Method Summary collapse

Instance Method Details

#create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object



31
32
33
34
35
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 31

def create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false)
  if overwrite || !ilm_policy_exists?(policy_id)
    ilm_policy_put(policy_id, ilm_policy)
  end
end

#default_policy_payloadObject



77
78
79
80
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 77

def default_policy_payload
  default_policy_path = File.join(__dir__, ILM_DEFAULT_POLICY_PATH)
  Yajl.load(::IO.read(default_policy_path))
end

#get_ilm_policyObject



47
48
49
50
51
52
53
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 47

def get_ilm_policy
  if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0")
    client.ilm.get_lifecycle
  else
    client.ilm.get_policy
  end
end

#ilm_policy_exists?(policy_id) ⇒ Boolean

Returns:

  • (Boolean)


55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 55

def ilm_policy_exists?(policy_id)
  begin
    if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0")
      client.ilm.get_lifecycle(policy: policy_id)
    else
      client.ilm.get_policy(policy_id: policy_id)
    end
    true
  rescue
    false
  end
end

#ilm_policy_put(policy_id, policy) ⇒ Object



68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 68

def ilm_policy_put(policy_id, policy)
  log.info("Installing ILM policy: #{policy}")
  if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0")
    client.ilm.put_lifecycle(policy: policy_id, body: policy)
  else
    client.ilm.put_policy(policy_id: policy_id, body: policy)
  end
end

#setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object



6
7
8
9
10
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 6

def setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false)
  return unless enable_ilm

  create_ilm_policy(policy_id, ilm_policy, overwrite)
end

#verify_ilm_workingObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 12

def verify_ilm_working
  # Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature
  # available and enabled.
  begin
    xpack = xpack_info
    if xpack.nil?
      raise Fluent::ConfigError, "xpack endpoint does not work"
    end
    features = xpack["features"]
    ilm = features.nil? ? nil : features["ilm"]
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch" if features.nil? || ilm.nil?
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not available in your Elasticsearch" unless ilm['available']
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not enabled in your Elasticsearch" unless ilm['enabled']

  rescue ::TRANSPORT_CLASS::Transport::Error => e
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch", error: e
  end
end

#xpack_infoObject



37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 37

def xpack_info
  begin
    client.xpack.info
  rescue NoMethodError
    raise RuntimeError, "elasticsearch-xpack gem is not installed."
  rescue
    nil
  end
end