Class: Elasticity::Strategies::AliasIndex
- Inherits:
-
Object
- Object
- Elasticity::Strategies::AliasIndex
- Defined in:
- lib/elasticity/strategies/alias_index.rb
Overview
This strategy keeps two aliases that might be mapped to the same index or different index, allowing runtime changes by simply atomically updating the aliases. For example, look at the remap method implementation.
Constant Summary collapse
- STATUSES =
[:missing, :ok]
Instance Method Summary collapse
- #bulk {|b| ... } ⇒ Object
- #create(index_def) ⇒ Object
- #create_if_undefined(index_def) ⇒ Object
- #delete ⇒ Object
- #delete_by_query(type, body) ⇒ Object
- #delete_document(type, id) ⇒ Object
- #delete_if_defined ⇒ Object
- #flush ⇒ Object
- #get_document(type, id) ⇒ Object
- #index_document(type, id, attributes) ⇒ Object
-
#initialize(client, index_base_name) ⇒ AliasIndex
constructor
A new instance of AliasIndex.
- #main_indexes ⇒ Object
- #mappings ⇒ Object
- #missing? ⇒ Boolean
- #recreate(index_def) ⇒ Object
- #ref_index_name ⇒ Object
-
#remap(index_def) ⇒ Object
Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes.
- #search(type, body) ⇒ Object
- #settings ⇒ Object
- #status ⇒ Object
- #update_indexes ⇒ Object
Constructor Details
#initialize(client, index_base_name) ⇒ AliasIndex
Returns a new instance of AliasIndex.
9 10 11 12 13 |
# File 'lib/elasticity/strategies/alias_index.rb', line 9 def initialize(client, index_base_name) @client = client @main_alias = index_base_name @update_alias = "#{index_base_name}_update" end |
Instance Method Details
#bulk {|b| ... } ⇒ Object
223 224 225 226 227 |
# File 'lib/elasticity/strategies/alias_index.rb', line 223 def bulk b = Bulk::Alias.new(@client, @update_alias, main_indexes) yield b b.execute end |
#create(index_def) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/elasticity/strategies/alias_index.rb', line 162 def create(index_def) if missing? index_name = create_index(index_def) @client.index_update_aliases(body: { actions: [ { add: { index: index_name, alias: @main_alias } }, { add: { index: index_name, alias: @update_alias } }, ] }) else raise IndexError.new(@main_alias, "index already exists") end end |
#create_if_undefined(index_def) ⇒ Object
176 177 178 |
# File 'lib/elasticity/strategies/alias_index.rb', line 176 def create_if_undefined(index_def) create(index_def) if missing? end |
#delete ⇒ Object
180 181 182 |
# File 'lib/elasticity/strategies/alias_index.rb', line 180 def delete @client.index_delete(index: "#{@main_alias}-*") end |
#delete_by_query(type, body) ⇒ Object
219 220 221 |
# File 'lib/elasticity/strategies/alias_index.rb', line 219 def delete_by_query(type, body) @client.delete_by_query(index: @main_alias, type: type, body: body) end |
#delete_document(type, id) ⇒ Object
203 204 205 206 207 208 209 |
# File 'lib/elasticity/strategies/alias_index.rb', line 203 def delete_document(type, id) ops = (main_indexes | update_indexes).map do |index| { delete: { _index: index, _type: type, _id: id } } end @client.bulk(body: ops) end |
#delete_if_defined ⇒ Object
184 185 186 |
# File 'lib/elasticity/strategies/alias_index.rb', line 184 def delete_if_defined delete unless missing? end |
#flush ⇒ Object
229 230 231 |
# File 'lib/elasticity/strategies/alias_index.rb', line 229 def flush @client.index_flush(index: @update_alias) end |
#get_document(type, id) ⇒ Object
211 212 213 |
# File 'lib/elasticity/strategies/alias_index.rb', line 211 def get_document(type, id) @client.get(index: @main_alias, type: type, id: id) end |
#index_document(type, id, attributes) ⇒ Object
193 194 195 196 197 198 199 200 201 |
# File 'lib/elasticity/strategies/alias_index.rb', line 193 def index_document(type, id, attributes) res = @client.index(index: @update_alias, type: type, id: id, body: attributes) if id = res["_id"] [id, res["created"]] else raise IndexError.new(@update_alias, "failed to index document") end end |
#main_indexes ⇒ Object
150 151 152 153 154 |
# File 'lib/elasticity/strategies/alias_index.rb', line 150 def main_indexes @client.index_get_aliases(index: "#{@main_alias}-*", name: @main_alias).keys rescue Elasticsearch::Transport::Transport::Errors::NotFound [] end |
#mappings ⇒ Object
241 242 243 244 245 246 247 |
# File 'lib/elasticity/strategies/alias_index.rb', line 241 def mappings args = { index: @main_alias } mapping = @client.index_get_mapping(index: @main_alias) mapping[@main_alias]["mappings"] rescue Elasticsearch::Transport::Transport::Errors::NotFound nil end |
#missing? ⇒ Boolean
146 147 148 |
# File 'lib/elasticity/strategies/alias_index.rb', line 146 def missing? status == :missing end |
#recreate(index_def) ⇒ Object
188 189 190 191 |
# File 'lib/elasticity/strategies/alias_index.rb', line 188 def recreate(index_def) delete_if_defined create(index_def) end |
#ref_index_name ⇒ Object
15 16 17 |
# File 'lib/elasticity/strategies/alias_index.rb', line 15 def ref_index_name @main_alias end |
#remap(index_def) ⇒ Object
Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes. Here is the overview of how it works:
-
Creates a new index with the new mapping
-
Update the aliases so that any write goes to the new index and reads goes to both indexes.
-
Use scan and scroll to iterate over all the documents in the old index, moving them to the new index.
-
Update the aliases so that all operations goes to the new index.
-
Deletes the old index.
It does a little bit more to ensure consistency and to handle race-conditions. For more details look at the implementation.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/elasticity/strategies/alias_index.rb', line 31 def remap(index_def) main_indexes = self.main_indexes update_indexes = self.update_indexes if main_indexes.size != 1 || update_indexes.size != 1 || main_indexes != update_indexes raise "Index can't be remapped right now, check if another remapping is already happening" end new_index = create_index(index_def) original_index = main_indexes[0] begin # Configure aliases so that search includes the old index and the new index, and writes are made to # the new index. @client.index_update_aliases(body: { actions: [ { remove: { index: original_index, alias: @update_alias } }, { add: { index: new_index, alias: @update_alias } }, { add: { index: new_index, alias: @main_alias }}, ] }) @client.index_flush(index: original_index) cursor = @client.search index: original_index, search_type: 'scan', scroll: '1m', _source: false, size: 100 loop do cursor = @client.scroll(scroll_id: cursor['_scroll_id'], scroll: '1m') hits = cursor['hits']['hits'] break if hits.empty? # Fetch documents based on the ids that existed when the migration started, to make sure we only migrate # documents that haven't been deleted. id_docs = hits.map do |hit| { _index: original_index, _type: hit["_type"], _id: hit["_id"] } end docs = @client.mget(body: { docs: id_docs }, refresh: true)["docs"] break if docs.empty? # Move only documents that still exists on the old index, into the new index. ops = [] docs.each do |doc| ops << { index: { _index: new_index, _type: doc["_type"], _id: doc["_id"], data: doc["_source"] } } if doc["found"] end @client.bulk(body: ops) # Deal with race conditions by removing from the new index any document that doesn't exist in the old index anymore. ops = [] @client.mget(body: { docs: id_docs }, refresh: true)["docs"].each_with_index do |new_doc, idx| if docs[idx]["found"] && !new_doc["found"] ops << { delete: { _index: new_index, _type: new_doc["_type"], _id: new_doc["_id"] } } end end @client.bulk(body: ops) unless ops.empty? end # Update aliases to only point to the new index. @client.index_update_aliases(body: { actions: [ { remove: { index: original_index, alias: @main_alias } }, ] }) @client.index_delete(index: original_index) rescue @client.index_update_aliases(body: { actions: [ { add: { index: original_index, alias: @update_alias } }, { remove: { index: new_index, alias: @update_alias } }, ] }) @client.index_flush(index: new_index) cursor = @client.search index: new_index, search_type: 'scan', scroll: '1m', size: 100 loop do cursor = @client.scroll(scroll_id: cursor['_scroll_id'], scroll: '1m') hits = cursor['hits']['hits'] break if hits.empty? # Move all the documents that exists on the new index back to the old index ops = [] hits.each do |doc| ops << { index: { _index: original_index, _type: doc["_type"], _id: doc["_id"], data: doc["_source"] } } end @client.bulk(body: ops) end @client.index_flush(index: original_index) @client.index_update_aliases(body: { actions: [ { remove: { index: new_index, alias: @main_alias } }, ] }) @client.index_delete(index: new_index) raise end end |
#search(type, body) ⇒ Object
215 216 217 |
# File 'lib/elasticity/strategies/alias_index.rb', line 215 def search(type, body) Search::Facade.new(@client, Search::Definition.new(@main_alias, type, body)) end |
#settings ⇒ Object
233 234 235 236 237 238 239 |
# File 'lib/elasticity/strategies/alias_index.rb', line 233 def settings args = { index: @main_alias } settings = @client.index_get_settings(index: @main_alias) settings[@main_alias]["settings"] rescue Elasticsearch::Transport::Transport::Errors::NotFound nil end |
#status ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/elasticity/strategies/alias_index.rb', line 132 def status search_exists = @client.index_exists_alias(name: @main_alias) update_exists = @client.index_exists_alias(name: @update_alias) case when search_exists && update_exists :ok when !search_exists && !update_exists :missing else :inconsistent end end |
#update_indexes ⇒ Object
156 157 158 159 160 |
# File 'lib/elasticity/strategies/alias_index.rb', line 156 def update_indexes @client.index_get_aliases(index: "#{@main_alias}-*", name: @update_alias).keys rescue Elasticsearch::Transport::Transport::Errors::NotFound [] end |