Class: Elasticity::Strategies::AliasIndex

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/strategies/alias_index.rb

Overview

This strategy keeps two aliases that might be mapped to the same index or different index, allowing runtime changes by simply atomically updating the aliases. For example, look at the remap method implementation.

Constant Summary collapse

SNAPSHOT_ERROR_SNIPPET =
"Cannot delete indices that are being snapshotted"
RETRYABLE_ERROR_SNIPPETS =
[
  SNAPSHOT_ERROR_SNIPPET
].freeze
STATUSES =
[:missing, :ok]

Instance Method Summary collapse

Constructor Details

#initialize(client, index_base_name, document_type, use_new_timestamp_format = nil, include_type_name_on_create = nil) ⇒ AliasIndex

Returns a new instance of AliasIndex.



14
15
16
17
18
19
# File 'lib/elasticity/strategies/alias_index.rb', line 14

def initialize(client, index_base_name, document_type, use_new_timestamp_format = nil, include_type_name_on_create = nil)
  @client       = client
  @main_alias   = index_base_name
  @update_alias = "#{index_base_name}_update"
  @document_type = document_type
end

Instance Method Details

#bulk {|b| ... } ⇒ Object

Yields:

  • (b)


251
252
253
254
255
# File 'lib/elasticity/strategies/alias_index.rb', line 251

def bulk
  b = Bulk::Alias.new(@client, @update_alias, main_indexes)
  yield b
  b.execute
end

#create(index_def) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/elasticity/strategies/alias_index.rb', line 187

def create(index_def)
  if missing?
    name = create_index(index_def)
    @created_index_name = name
    @client.index_update_aliases(body: {
      actions: [
        { add: { index: name, alias: @main_alias } },
        { add: { index: name, alias: @update_alias } },
      ]
    })
  else
    raise IndexError.new(@main_alias, "index already exists")
  end
end

#create_if_undefined(index_def) ⇒ Object



202
203
204
# File 'lib/elasticity/strategies/alias_index.rb', line 202

def create_if_undefined(index_def)
  create(index_def) if missing?
end

#deleteObject



206
207
208
209
210
# File 'lib/elasticity/strategies/alias_index.rb', line 206

def delete
  main_indexes.each do |index|
    @client.index_delete(index: index)
  end
end

#delete_by_query(body) ⇒ Object



247
248
249
# File 'lib/elasticity/strategies/alias_index.rb', line 247

def delete_by_query(body)
  @client.delete_by_query(index: @main_alias, body: body)
end

#delete_document(id) ⇒ Object



231
232
233
234
235
236
237
# File 'lib/elasticity/strategies/alias_index.rb', line 231

def delete_document(id)
  ops = (main_indexes | update_indexes).map do |index|
    { delete: { _index: index, _id: id } }
  end

  @client.bulk(body: ops)
end

#delete_if_definedObject



212
213
214
# File 'lib/elasticity/strategies/alias_index.rb', line 212

def delete_if_defined
  delete unless missing?
end

#flushObject



257
258
259
# File 'lib/elasticity/strategies/alias_index.rb', line 257

def flush
  @client.index_flush(index: @update_alias)
end

#get_document(id) ⇒ Object



239
240
241
# File 'lib/elasticity/strategies/alias_index.rb', line 239

def get_document(id)
  @client.get(index: @main_alias, id: id)
end

#index_document(id, attributes) ⇒ Object



221
222
223
224
225
226
227
228
229
# File 'lib/elasticity/strategies/alias_index.rb', line 221

def index_document(id, attributes)
  res = @client.index(index: @update_alias, id: id, body: attributes)

  if id = res["_id"]
    [id, res["_shards"] && res["_shards"]["successful"].to_i > 0]
  else
    raise IndexError.new(@update_alias, "failed to index document. Response: #{res.inspect}")
  end
end

#main_indexesObject



175
176
177
178
179
# File 'lib/elasticity/strategies/alias_index.rb', line 175

def main_indexes
  @client.index_get_alias(index: "#{@main_alias}-*", name: @main_alias).keys
rescue Elastic::Transport::Transport::Errors::NotFound
  []
end

#mappingObject



279
280
281
282
283
# File 'lib/elasticity/strategies/alias_index.rb', line 279

def mapping
  @client.index_get_mapping(index: @main_alias).values.first
rescue Elastic::Transport::Transport::Errors::NotFound
  nil
end

#mappingsObject



271
272
273
274
275
276
277
# File 'lib/elasticity/strategies/alias_index.rb', line 271

def mappings
  ActiveSupport::Deprecation.warn(
    "Elasticity::Strategies::AliasIndex#mappings is deprecated, "\
    "use mapping instead"
  )
  mapping
end

#missing?Boolean

Returns:

  • (Boolean)


171
172
173
# File 'lib/elasticity/strategies/alias_index.rb', line 171

def missing?
  status == :missing
end

#recreate(index_def) ⇒ Object



216
217
218
219
# File 'lib/elasticity/strategies/alias_index.rb', line 216

def recreate(index_def)
  delete_if_defined
  create(index_def)
end

#ref_index_nameObject



21
22
23
# File 'lib/elasticity/strategies/alias_index.rb', line 21

def ref_index_name
  @main_alias
end

#refreshObject



261
262
263
# File 'lib/elasticity/strategies/alias_index.rb', line 261

def refresh
  @client.index_refresh(index: @update_alias)
end

#remap(index_def, retry_delete_on_recoverable_errors: false, retry_delay: 0, max_delay: 0) ⇒ Object

Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes. Here is the overview of how it works:

  1. Creates a new index with the new mapping

  2. Update the aliases so that any write goes to the new index and reads goes to both indexes.

  3. Use scan and scroll to iterate over all the documents in the old index, moving them to the new index.

  4. Update the aliases so that all operations goes to the new index.

  5. Deletes the old index.

It does a little bit more to ensure consistency and to handle race-conditions. For more details look at the implementation.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/elasticity/strategies/alias_index.rb', line 37

def remap(index_def, retry_delete_on_recoverable_errors: false, retry_delay: 0, max_delay: 0)
  main_indexes   = self.main_indexes
  update_indexes = self.update_indexes

  if main_indexes.size != 1 || update_indexes.size != 1 || main_indexes != update_indexes
    raise "Index can't be remapped right now, check if another remapping is already happening"
  end

  new_index      = create_index(index_def)
  original_index = main_indexes[0]

  begin
    # Configure aliases so that search includes the old index and the new index, and writes are made to
    # the new index.
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: original_index, alias: @update_alias } },
        { add:    { index: new_index, alias: @update_alias } },
        { add:    { index: new_index, alias: @main_alias }},
      ]
    })

    @client.index_refresh(index: original_index)
    cursor = @client.search index: original_index, search_type: :query_then_fetch, scroll: "10m", size: 100
    loop do
      hits   = cursor["hits"]["hits"]
      break if hits.empty?

      # Fetch documents based on the ids that existed when the migration started, to make sure we only migrate
      # documents that haven't been deleted.
      id_docs = hits.map do |hit|
        { _index: original_index, _id: hit["_id"] }
      end

      docs = @client.mget(body: { docs: id_docs }, refresh: true)["docs"]
      break if docs.empty?

      # Modify document hashes to match the mapping definition so that legacy fields aren't added
      defined_mapping_fields = index_def[:mappings]["properties"].keys

      # Move only documents that still exists on the old index, into the new index.
      ops = []
      docs.each do |doc|
        if doc["found"]
          legacy_fields = doc["_source"].keys - defined_mapping_fields
          legacy_fields.each { |field| doc["_source"].delete(field) }
          ops << { index: { _index: new_index, _id: doc["_id"], data: doc["_source"] } }
        end
      end

      @client.bulk(body: ops)

      # Deal with race conditions by removing from the new index any document that doesn't exist in the old index anymore.
      ops = []
      @client.mget(body: { docs: id_docs }, refresh: true)["docs"].each_with_index do |new_doc, idx|
        if docs[idx]["found"] && !new_doc["found"]
          ops << { delete: { _index: new_index, _id: new_doc["_id"] } }
        end
      end

      @client.bulk(body: ops) unless ops.empty?
      cursor = @client.scroll(scroll_id: cursor["_scroll_id"], scroll: "1m", body: { scroll_id: cursor["_scroll_id"] })
    end

    # Update aliases to only point to the new index.
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: original_index, alias: @main_alias } },
      ]
    })

    waiting_duration = 0
    begin
      @client.index_delete(index: original_index)
    rescue Elastic::Transport::Transport::ServerError => e
      if retryable_error?(e)  && retry_delete_on_recoverable_errors && waiting_duration < max_delay
        waiting_duration += retry_delay
        sleep(retry_delay)
        retry
      else
        raise e
      end
    end
  rescue
    @client.index_update_aliases(body: {
      actions: [
        { add:    { index: original_index, alias: @main_alias } },
        { add:    { index: original_index, alias: @update_alias } },
        { remove: { index: new_index, alias: @update_alias } },
      ]
    })

    @client.index_refresh(index: new_index)
    cursor = @client.search index: new_index, search_type: :query_then_fetch, scroll: "1m", size: 100
    loop do
      hits   = cursor["hits"]["hits"]
      break if hits.empty?

      # Move all the documents that exists on the new index back to the old index
      ops = []
      hits.each do |doc|
        ops << { index: { _index: original_index, _id: doc["_id"], data: doc["_source"] } }
      end

      @client.bulk(body: ops)
      cursor = @client.scroll(scroll_id: cursor["_scroll_id"], scroll: "1m")
    end

    @client.index_refresh(index: original_index)
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: new_index, alias: @main_alias } },
      ]
    })
    @client.index_delete(index: new_index)

    raise
  end
end

#search_indexObject



243
244
245
# File 'lib/elasticity/strategies/alias_index.rb', line 243

def search_index
  @main_alias
end

#settingsObject



265
266
267
268
269
# File 'lib/elasticity/strategies/alias_index.rb', line 265

def settings
  @client.index_get_settings(index: @main_alias).values.first
rescue Elastic::Transport::Transport::Errors::NotFound
  nil
end

#statusObject



157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/elasticity/strategies/alias_index.rb', line 157

def status
  search_exists = @client.index_exists_alias(name: @main_alias)
  update_exists = @client.index_exists_alias(name: @update_alias)

  case
  when search_exists && update_exists
    :ok
  when !search_exists && !update_exists
    :missing
  else
    :inconsistent
  end
end

#update_indexesObject



181
182
183
184
185
# File 'lib/elasticity/strategies/alias_index.rb', line 181

def update_indexes
  @client.index_get_alias(index: "#{@main_alias}-*", name: @update_alias).keys
rescue Elastic::Transport::Transport::Errors::NotFound
  []
end