Class: Elasticity::Strategies::AliasIndex

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/strategies/alias_index.rb

Overview

This strategy keeps two aliases that might be mapped to the same index or different index, allowing runtime changes by simply atomically updating the aliases. For example, look at the remap method implementation.

Constant Summary collapse

STATUSES =
[:missing, :ok]

Instance Method Summary collapse

Constructor Details

#initialize(client, index_base_name) ⇒ AliasIndex

Returns a new instance of AliasIndex.



9
10
11
12
13
# File 'lib/elasticity/strategies/alias_index.rb', line 9

def initialize(client, index_base_name)
  @client       = client
  @main_alias   = index_base_name
  @update_alias = "#{index_base_name}_update"
end

Instance Method Details

#bulk {|b| ... } ⇒ Object

Yields:

  • (b)


223
224
225
226
227
# File 'lib/elasticity/strategies/alias_index.rb', line 223

def bulk
  b = Bulk::Alias.new(@client, @update_alias, main_indexes)
  yield b
  b.execute
end

#create(index_def) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/elasticity/strategies/alias_index.rb', line 162

def create(index_def)
  if missing?
    index_name = create_index(index_def)
    @client.index_update_aliases(body: {
      actions: [
        { add: { index: index_name, alias: @main_alias } },
        { add: { index: index_name, alias: @update_alias } },
      ]
    })
  else
    raise IndexError.new(@main_alias, "index already exists")
  end
end

#create_if_undefined(index_def) ⇒ Object



176
177
178
# File 'lib/elasticity/strategies/alias_index.rb', line 176

def create_if_undefined(index_def)
  create(index_def) if missing?
end

#deleteObject



180
181
182
# File 'lib/elasticity/strategies/alias_index.rb', line 180

def delete
  @client.index_delete(index: "#{@main_alias}-*")
end

#delete_by_query(type, body) ⇒ Object



219
220
221
# File 'lib/elasticity/strategies/alias_index.rb', line 219

def delete_by_query(type, body)
  @client.delete_by_query(index: @main_alias, type: type, body: body)
end

#delete_document(type, id) ⇒ Object



203
204
205
206
207
208
209
# File 'lib/elasticity/strategies/alias_index.rb', line 203

def delete_document(type, id)
  ops = (main_indexes | update_indexes).map do |index|
    { delete: { _index: index, _type: type, _id: id } }
  end

  @client.bulk(body: ops)
end

#delete_if_definedObject



184
185
186
# File 'lib/elasticity/strategies/alias_index.rb', line 184

def delete_if_defined
  delete unless missing?
end

#flushObject



229
230
231
# File 'lib/elasticity/strategies/alias_index.rb', line 229

def flush
  @client.index_flush(index: @update_alias)
end

#get_document(type, id) ⇒ Object



211
212
213
# File 'lib/elasticity/strategies/alias_index.rb', line 211

def get_document(type, id)
  @client.get(index: @main_alias, type: type, id: id)
end

#index_document(type, id, attributes) ⇒ Object



193
194
195
196
197
198
199
200
201
# File 'lib/elasticity/strategies/alias_index.rb', line 193

def index_document(type, id, attributes)
  res = @client.index(index: @update_alias, type: type, id: id, body: attributes)

  if id = res["_id"]
    [id, res["created"]]
  else
    raise IndexError.new(@update_alias, "failed to index document")
  end
end

#main_indexesObject



150
151
152
153
154
# File 'lib/elasticity/strategies/alias_index.rb', line 150

def main_indexes
  @client.index_get_aliases(index: "#{@main_alias}-*", name: @main_alias).keys
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  []
end

#mappingsObject



241
242
243
244
245
246
247
# File 'lib/elasticity/strategies/alias_index.rb', line 241

def mappings
  args = { index: @main_alias }
  mapping = @client.index_get_mapping(index: @main_alias)
  mapping[@main_alias]["mappings"]
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end

#missing?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/elasticity/strategies/alias_index.rb', line 146

def missing?
  status == :missing
end

#recreate(index_def) ⇒ Object



188
189
190
191
# File 'lib/elasticity/strategies/alias_index.rb', line 188

def recreate(index_def)
  delete_if_defined
  create(index_def)
end

#ref_index_nameObject



15
16
17
# File 'lib/elasticity/strategies/alias_index.rb', line 15

def ref_index_name
  @main_alias
end

#remap(index_def) ⇒ Object

Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes. Here is the overview of how it works:

  1. Creates a new index with the new mapping

  2. Update the aliases so that any write goes to the new index and reads goes to both indexes.

  3. Use scan and scroll to iterate over all the documents in the old index, moving them to the new index.

  4. Update the aliases so that all operations goes to the new index.

  5. Deletes the old index.

It does a little bit more to ensure consistency and to handle race-conditions. For more details look at the implementation.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/elasticity/strategies/alias_index.rb', line 31

def remap(index_def)
  main_indexes   = self.main_indexes
  update_indexes = self.update_indexes

  if main_indexes.size != 1 || update_indexes.size != 1 || main_indexes != update_indexes
    raise "Index can't be remapped right now, check if another remapping is already happening"
  end

  new_index      = create_index(index_def)
  original_index = main_indexes[0]

  begin
    # Configure aliases so that search includes the old index and the new index, and writes are made to
    # the new index.
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: original_index, alias: @update_alias } },
        { add:    { index: new_index, alias: @update_alias } },
        { add:    { index: new_index, alias: @main_alias }},
      ]
    })

    @client.index_flush(index: original_index)
    cursor = @client.search index: original_index, search_type: 'scan', scroll: '1m', _source: false, size: 100
    loop do
      cursor = @client.scroll(scroll_id: cursor['_scroll_id'], scroll: '1m')
      hits   = cursor['hits']['hits']
      break if hits.empty?

      # Fetch documents based on the ids that existed when the migration started, to make sure we only migrate
      # documents that haven't been deleted.
      id_docs = hits.map do |hit|
        { _index: original_index, _type: hit["_type"], _id: hit["_id"] }
      end

      docs = @client.mget(body: { docs: id_docs }, refresh: true)["docs"]
      break if docs.empty?

      # Move only documents that still exists on the old index, into the new index.
      ops = []
      docs.each do |doc|
        ops << { index: { _index: new_index, _type: doc["_type"], _id: doc["_id"], data: doc["_source"] } } if doc["found"]
      end

      @client.bulk(body: ops)

      # Deal with race conditions by removing from the new index any document that doesn't exist in the old index anymore.
      ops = []
      @client.mget(body: { docs: id_docs }, refresh: true)["docs"].each_with_index do |new_doc, idx|
        if docs[idx]["found"] && !new_doc["found"]
          ops << { delete: { _index: new_index, _type: new_doc["_type"], _id: new_doc["_id"] } }
        end
      end

      @client.bulk(body: ops) unless ops.empty?
    end

    # Update aliases to only point to the new index.
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: original_index, alias: @main_alias } },
      ]
    })
    @client.index_delete(index: original_index)

  rescue
    @client.index_update_aliases(body: {
      actions: [
        { add:    { index: original_index, alias: @update_alias } },
        { remove: { index: new_index, alias: @update_alias } },
      ]
    })

    @client.index_flush(index: new_index)
    cursor = @client.search index: new_index, search_type: 'scan', scroll: '1m', size: 100
    loop do
      cursor = @client.scroll(scroll_id: cursor['_scroll_id'], scroll: '1m')
      hits   = cursor['hits']['hits']
      break if hits.empty?

      # Move all the documents that exists on the new index back to the old index
      ops = []
      hits.each do |doc|
        ops << { index: { _index: original_index, _type: doc["_type"], _id: doc["_id"], data: doc["_source"] } }
      end

      @client.bulk(body: ops)
    end

    @client.index_flush(index: original_index)
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: new_index, alias: @main_alias } },
      ]
    })
    @client.index_delete(index: new_index)

    raise
  end
end

#search(type, body) ⇒ Object



215
216
217
# File 'lib/elasticity/strategies/alias_index.rb', line 215

def search(type, body)
  Search::Facade.new(@client, Search::Definition.new(@main_alias, type, body))
end

#settingsObject



233
234
235
236
237
238
239
# File 'lib/elasticity/strategies/alias_index.rb', line 233

def settings
  args = { index: @main_alias }
  settings = @client.index_get_settings(index: @main_alias)
  settings[@main_alias]["settings"]
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end

#statusObject



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/elasticity/strategies/alias_index.rb', line 132

def status
  search_exists = @client.index_exists_alias(name: @main_alias)
  update_exists = @client.index_exists_alias(name: @update_alias)

  case
  when search_exists && update_exists
    :ok
  when !search_exists && !update_exists
    :missing
  else
    :inconsistent
  end
end

#update_indexesObject



156
157
158
159
160
# File 'lib/elasticity/strategies/alias_index.rb', line 156

def update_indexes
  @client.index_get_aliases(index: "#{@main_alias}-*", name: @update_alias).keys
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  []
end