Module: Fluent::Plugin::ElasticsearchIndexLifecycleManagement
- Included in:
- ElasticsearchOutput
- Defined in:
- lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb
Constant Summary collapse
- ILM_DEFAULT_POLICY_PATH =
"default-ilm-policy.json"
Instance Method Summary collapse
- #create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object
- #default_policy_payload ⇒ Object
- #get_ilm_policy ⇒ Object
- #ilm_policy_exists?(policy_id) ⇒ Boolean
- #ilm_policy_put(policy_id, policy) ⇒ Object
- #setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object
- #verify_ilm_working ⇒ Object
- #xpack_info ⇒ Object
Instance Method Details
#create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object
31 32 33 34 35 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 31 def create_ilm_policy(policy_id, ilm_policy = default_policy_payload, overwrite = false) if overwrite || !ilm_policy_exists?(policy_id) ilm_policy_put(policy_id, ilm_policy) end end |
#default_policy_payload ⇒ Object
77 78 79 80 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 77 def default_policy_payload default_policy_path = File.join(__dir__, ILM_DEFAULT_POLICY_PATH) Yajl.load(::IO.read(default_policy_path)) end |
#get_ilm_policy ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 47 def get_ilm_policy if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") client.ilm.get_lifecycle else client.ilm.get_policy end end |
#ilm_policy_exists?(policy_id) ⇒ Boolean
55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 55 def ilm_policy_exists?(policy_id) begin if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") client.ilm.get_lifecycle(policy: policy_id) else client.ilm.get_policy(policy_id: policy_id) end true rescue false end end |
#ilm_policy_put(policy_id, policy) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 68 def ilm_policy_put(policy_id, policy) log.info("Installing ILM policy: #{policy}") if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("8.0.0") client.ilm.put_lifecycle(policy: policy_id, body: policy) else client.ilm.put_policy(policy_id: policy_id, body: policy) end end |
#setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false) ⇒ Object
6 7 8 9 10 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 6 def setup_ilm(enable_ilm, policy_id, ilm_policy = default_policy_payload, overwrite = false) return unless enable_ilm create_ilm_policy(policy_id, ilm_policy, overwrite) end |
#verify_ilm_working ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 12 def verify_ilm_working # Check the Elasticsearch instance for ILM readiness - this means that the version has to be a non-OSS release, with ILM feature # available and enabled. begin xpack = xpack_info if xpack.nil? raise Fluent::ConfigError, "xpack endpoint does not work" end features = xpack["features"] ilm = features.nil? ? nil : features["ilm"] raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch" if features.nil? || ilm.nil? raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not available in your Elasticsearch" unless ilm['available'] raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not enabled in your Elasticsearch" unless ilm['enabled'] rescue ::TRANSPORT_CLASS::Transport::Error => e raise Fluent::ConfigError, "Index Lifecycle management is enabled in Fluentd, but not installed on your Elasticsearch", error: e end end |
#xpack_info ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/elasticsearch_index_lifecycle_management.rb', line 37 def xpack_info begin client.xpack.info rescue NoMethodError raise RuntimeError, "elasticsearch-xpack gem is not installed." rescue nil end end |