Class: Elasticity::Strategies::AliasIndex
- Inherits:
-
Object
- Object
- Elasticity::Strategies::AliasIndex
- Defined in:
- lib/elasticity/strategies/alias_index.rb
Overview
This strategy keeps two aliases that might be mapped to the same index or different index, allowing runtime changes by simply atomically updating the aliases. For example, look at the remap method implementation.
Constant Summary collapse
- SNAPSHOT_ERROR_SNIPPET =
"Cannot delete indices that are being snapshotted"
- RETRYABLE_ERROR_SNIPPETS =
[ SNAPSHOT_ERROR_SNIPPET ].freeze
- STATUSES =
[:missing, :ok]
Instance Method Summary collapse
- #bulk {|b| ... } ⇒ Object
- #create(index_def) ⇒ Object
- #create_if_undefined(index_def) ⇒ Object
- #delete ⇒ Object
- #delete_by_query(body) ⇒ Object
- #delete_document(id) ⇒ Object
- #delete_if_defined ⇒ Object
- #flush ⇒ Object
- #get_document(id) ⇒ Object
- #index_document(id, attributes) ⇒ Object
-
#initialize(client, index_base_name, document_type, use_new_timestamp_format = nil, include_type_name_on_create = nil) ⇒ AliasIndex
constructor
A new instance of AliasIndex.
- #main_indexes ⇒ Object
- #mapping ⇒ Object
- #mappings ⇒ Object
- #missing? ⇒ Boolean
- #recreate(index_def) ⇒ Object
- #ref_index_name ⇒ Object
- #refresh ⇒ Object
-
#remap(index_def, retry_delete_on_recoverable_errors: false, retry_delay: 0, max_delay: 0) ⇒ Object
Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes.
- #search_index ⇒ Object
- #settings ⇒ Object
- #status ⇒ Object
- #update_indexes ⇒ Object
Constructor Details
#initialize(client, index_base_name, document_type, use_new_timestamp_format = nil, include_type_name_on_create = nil) ⇒ AliasIndex
Returns a new instance of AliasIndex.
14 15 16 17 18 19 |
# File 'lib/elasticity/strategies/alias_index.rb', line 14 def initialize(client, index_base_name, document_type, = nil, include_type_name_on_create = nil) @client = client @main_alias = index_base_name @update_alias = "#{index_base_name}_update" @document_type = document_type end |
Instance Method Details
#bulk {|b| ... } ⇒ Object
251 252 253 254 255 |
# File 'lib/elasticity/strategies/alias_index.rb', line 251 def bulk b = Bulk::Alias.new(@client, @update_alias, main_indexes) yield b b.execute end |
#create(index_def) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/elasticity/strategies/alias_index.rb', line 187 def create(index_def) if missing? name = create_index(index_def) @created_index_name = name @client.index_update_aliases(body: { actions: [ { add: { index: name, alias: @main_alias } }, { add: { index: name, alias: @update_alias } }, ] }) else raise IndexError.new(@main_alias, "index already exists") end end |
#create_if_undefined(index_def) ⇒ Object
202 203 204 |
# File 'lib/elasticity/strategies/alias_index.rb', line 202 def create_if_undefined(index_def) create(index_def) if missing? end |
#delete ⇒ Object
206 207 208 209 210 |
# File 'lib/elasticity/strategies/alias_index.rb', line 206 def delete main_indexes.each do |index| @client.index_delete(index: index) end end |
#delete_by_query(body) ⇒ Object
247 248 249 |
# File 'lib/elasticity/strategies/alias_index.rb', line 247 def delete_by_query(body) @client.delete_by_query(index: @main_alias, body: body) end |
#delete_document(id) ⇒ Object
231 232 233 234 235 236 237 |
# File 'lib/elasticity/strategies/alias_index.rb', line 231 def delete_document(id) ops = (main_indexes | update_indexes).map do |index| { delete: { _index: index, _id: id } } end @client.bulk(body: ops) end |
#delete_if_defined ⇒ Object
212 213 214 |
# File 'lib/elasticity/strategies/alias_index.rb', line 212 def delete_if_defined delete unless missing? end |
#flush ⇒ Object
257 258 259 |
# File 'lib/elasticity/strategies/alias_index.rb', line 257 def flush @client.index_flush(index: @update_alias) end |
#get_document(id) ⇒ Object
239 240 241 |
# File 'lib/elasticity/strategies/alias_index.rb', line 239 def get_document(id) @client.get(index: @main_alias, id: id) end |
#index_document(id, attributes) ⇒ Object
221 222 223 224 225 226 227 228 229 |
# File 'lib/elasticity/strategies/alias_index.rb', line 221 def index_document(id, attributes) res = @client.index(index: @update_alias, id: id, body: attributes) if id = res["_id"] [id, res["_shards"] && res["_shards"]["successful"].to_i > 0] else raise IndexError.new(@update_alias, "failed to index document. Response: #{res.inspect}") end end |
#main_indexes ⇒ Object
175 176 177 178 179 |
# File 'lib/elasticity/strategies/alias_index.rb', line 175 def main_indexes @client.index_get_alias(index: "#{@main_alias}-*", name: @main_alias).keys rescue Elastic::Transport::Transport::Errors::NotFound [] end |
#mapping ⇒ Object
279 280 281 282 283 |
# File 'lib/elasticity/strategies/alias_index.rb', line 279 def mapping @client.index_get_mapping(index: @main_alias).values.first rescue Elastic::Transport::Transport::Errors::NotFound nil end |
#mappings ⇒ Object
271 272 273 274 275 276 277 |
# File 'lib/elasticity/strategies/alias_index.rb', line 271 def mappings ActiveSupport::Deprecation.warn( "Elasticity::Strategies::AliasIndex#mappings is deprecated, "\ "use mapping instead" ) mapping end |
#missing? ⇒ Boolean
171 172 173 |
# File 'lib/elasticity/strategies/alias_index.rb', line 171 def missing? status == :missing end |
#recreate(index_def) ⇒ Object
216 217 218 219 |
# File 'lib/elasticity/strategies/alias_index.rb', line 216 def recreate(index_def) delete_if_defined create(index_def) end |
#ref_index_name ⇒ Object
21 22 23 |
# File 'lib/elasticity/strategies/alias_index.rb', line 21 def ref_index_name @main_alias end |
#refresh ⇒ Object
261 262 263 |
# File 'lib/elasticity/strategies/alias_index.rb', line 261 def refresh @client.index_refresh(index: @update_alias) end |
#remap(index_def, retry_delete_on_recoverable_errors: false, retry_delay: 0, max_delay: 0) ⇒ Object
Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes. Here is the overview of how it works:
-
Creates a new index with the new mapping
-
Update the aliases so that any write goes to the new index and reads goes to both indexes.
-
Use scan and scroll to iterate over all the documents in the old index, moving them to the new index.
-
Update the aliases so that all operations goes to the new index.
-
Deletes the old index.
It does a little bit more to ensure consistency and to handle race-conditions. For more details look at the implementation.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/elasticity/strategies/alias_index.rb', line 37 def remap(index_def, retry_delete_on_recoverable_errors: false, retry_delay: 0, max_delay: 0) main_indexes = self.main_indexes update_indexes = self.update_indexes if main_indexes.size != 1 || update_indexes.size != 1 || main_indexes != update_indexes raise "Index can't be remapped right now, check if another remapping is already happening" end new_index = create_index(index_def) original_index = main_indexes[0] begin # Configure aliases so that search includes the old index and the new index, and writes are made to # the new index. @client.index_update_aliases(body: { actions: [ { remove: { index: original_index, alias: @update_alias } }, { add: { index: new_index, alias: @update_alias } }, { add: { index: new_index, alias: @main_alias }}, ] }) @client.index_refresh(index: original_index) cursor = @client.search index: original_index, search_type: :query_then_fetch, scroll: "10m", size: 100 loop do hits = cursor["hits"]["hits"] break if hits.empty? # Fetch documents based on the ids that existed when the migration started, to make sure we only migrate # documents that haven't been deleted. id_docs = hits.map do |hit| { _index: original_index, _id: hit["_id"] } end docs = @client.mget(body: { docs: id_docs }, refresh: true)["docs"] break if docs.empty? # Modify document hashes to match the mapping definition so that legacy fields aren't added defined_mapping_fields = index_def[:mappings]["properties"].keys # Move only documents that still exists on the old index, into the new index. ops = [] docs.each do |doc| if doc["found"] legacy_fields = doc["_source"].keys - defined_mapping_fields legacy_fields.each { |field| doc["_source"].delete(field) } ops << { index: { _index: new_index, _id: doc["_id"], data: doc["_source"] } } end end @client.bulk(body: ops) # Deal with race conditions by removing from the new index any document that doesn't exist in the old index anymore. ops = [] @client.mget(body: { docs: id_docs }, refresh: true)["docs"].each_with_index do |new_doc, idx| if docs[idx]["found"] && !new_doc["found"] ops << { delete: { _index: new_index, _id: new_doc["_id"] } } end end @client.bulk(body: ops) unless ops.empty? cursor = @client.scroll(scroll_id: cursor["_scroll_id"], scroll: "1m", body: { scroll_id: cursor["_scroll_id"] }) end # Update aliases to only point to the new index. @client.index_update_aliases(body: { actions: [ { remove: { index: original_index, alias: @main_alias } }, ] }) waiting_duration = 0 begin @client.index_delete(index: original_index) rescue Elastic::Transport::Transport::ServerError => e if retryable_error?(e) && retry_delete_on_recoverable_errors && waiting_duration < max_delay waiting_duration += retry_delay sleep(retry_delay) retry else raise e end end rescue @client.index_update_aliases(body: { actions: [ { add: { index: original_index, alias: @main_alias } }, { add: { index: original_index, alias: @update_alias } }, { remove: { index: new_index, alias: @update_alias } }, ] }) @client.index_refresh(index: new_index) cursor = @client.search index: new_index, search_type: :query_then_fetch, scroll: "1m", size: 100 loop do hits = cursor["hits"]["hits"] break if hits.empty? # Move all the documents that exists on the new index back to the old index ops = [] hits.each do |doc| ops << { index: { _index: original_index, _id: doc["_id"], data: doc["_source"] } } end @client.bulk(body: ops) cursor = @client.scroll(scroll_id: cursor["_scroll_id"], scroll: "1m") end @client.index_refresh(index: original_index) @client.index_update_aliases(body: { actions: [ { remove: { index: new_index, alias: @main_alias } }, ] }) @client.index_delete(index: new_index) raise end end |
#search_index ⇒ Object
243 244 245 |
# File 'lib/elasticity/strategies/alias_index.rb', line 243 def search_index @main_alias end |
#settings ⇒ Object
265 266 267 268 269 |
# File 'lib/elasticity/strategies/alias_index.rb', line 265 def settings @client.index_get_settings(index: @main_alias).values.first rescue Elastic::Transport::Transport::Errors::NotFound nil end |
#status ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/elasticity/strategies/alias_index.rb', line 157 def status search_exists = @client.index_exists_alias(name: @main_alias) update_exists = @client.index_exists_alias(name: @update_alias) case when search_exists && update_exists :ok when !search_exists && !update_exists :missing else :inconsistent end end |
#update_indexes ⇒ Object
181 182 183 184 185 |
# File 'lib/elasticity/strategies/alias_index.rb', line 181 def update_indexes @client.index_get_alias(index: "#{@main_alias}-*", name: @update_alias).keys rescue Elastic::Transport::Transport::Errors::NotFound [] end |