Module: Fluent::Plugin::ElasticsearchIndexLifecycleManagement

Included in:
ElasticsearchOutput
Defined in:
lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb

Constant Summary collapse

ILM_DEFAULT_POLICY_PATH =
"default-ilm-policy.json"

Instance Method Summary collapse

Instance Method Details

#create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object



29
30
31
32
33
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 29

def create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false)
  if overwrite || !ilm_policy_exists?(policy_id)
    ilm_policy_put(policy_id, ilm_policy)
  end
end

#default_policy_payloadObject



63
64
65
66
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 63

def default_policy_payload
  default_policy_path = File.join(__dir__, ILM_DEFAULT_POLICY_PATH)
  Yajl.load(::IO.read(default_policy_path))
end

#get_ilm_policyObject



45
46
47
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 45

def get_ilm_policy
  client.ilm.get_policy
end

#ilm_policy_exists?(policy_id) ⇒ Boolean

Returns:

  • (Boolean)


49
50
51
52
53
54
55
56
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 49

def ilm_policy_exists?(policy_id)
  begin
    client.ilm.get_policy(policy_id: policy_id)
    true
  rescue
    false
  end
end

#ilm_policy_put(policy_id, policy) ⇒ Object



58
59
60
61
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 58

def ilm_policy_put(policy_id, policy)
  log.info("Installing ILM policy: #{policy}")
  client.ilm.put_policy(policy_id: policy_id, body: policy)
end

#setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object



4
5
6
7
8
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 4

def setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false)
  return unless enable_ilm

  create_ilm_policy(policy_id, ilm_policy, overwrite)
end

#verify_ilm_workingObject



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 10

def verify_ilm_working
  # Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature
  # available and enabled.
  begin
    xpack = xpack_info
    if xpack.nil?
      raise Fluent::ConfigError, "xpack endpoint does not work"
    end
    features = xpack["features"]
    ilm = features.nil? ? nil : features["ilm"]
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch" if features.nil? || ilm.nil?
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not available in your Elasticsearch" unless ilm['available']
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not enabled in your Elasticsearch" unless ilm['enabled']

  rescue Elasticsearch::Transport::Transport::Error => e
    raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch", error: e
  end
end

#xpack_infoObject



35
36
37
38
39
40
41
42
43
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 35

def xpack_info
  begin
    client.xpack.info
  rescue NoMethodError
    raise RuntimeError, "elasticsearch-xpack gem is not installed."
  rescue
    nil
  end
end