Class: EsDumpRestore::EsClient

Inherits:
Object
  • Object
show all
Defined in:
lib/es_dump_restore/es_client.rb

Constant Summary collapse

SLEEP_BETWEEN_RETRIES =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(base_uri, index_name, type, exception_retries = 1) ⇒ EsClient

Returns a new instance of EsClient.



12
13
14
15
16
17
18
19
# File 'lib/es_dump_restore/es_client.rb', line 12

def initialize(base_uri, index_name, type, exception_retries=1)
  @httpclient = HTTPClient.new
  @index_name = index_name

  @es_uri = base_uri
  @path_prefix = type.nil? ? index_name : index_name + "/" + type
  @exception_retries = exception_retries
end

Instance Attribute Details

#base_uriObject

Returns the value of attribute base_uri.



9
10
11
# File 'lib/es_dump_restore/es_client.rb', line 9

def base_uri
  @base_uri
end

#index_nameObject

Returns the value of attribute index_name.



10
11
12
# File 'lib/es_dump_restore/es_client.rb', line 10

def index_name
  @index_name
end

Instance Method Details

#bulk_index(data) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/es_dump_restore/es_client.rb', line 133

def bulk_index(data)
  retries = 0
  begin
    request(:post, "#{@path_prefix}/_bulk", :body => data)
  rescue HTTPClient::TimeoutError => e
    if retries < @exception_retries
      retries += 1
      puts "Retrying (#{retries} of #{@exception_retries}) '#{@path_prefix}/_bulk'"
      sleep SLEEP_BETWEEN_RETRIES # add a sleep here so we aren't hammering the server
      retry
    end
    raise e
  end
end

#check_alias(alias_name) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/es_dump_restore/es_client.rb', line 94

def check_alias(alias_name)
  # Checks that it's possible to do an atomic restore using the given alias
  # name.  This requires that:
  #  - `alias_name` doesn't point to an existing index
  #  - `index_name` doesn't point to an existing index
  existing = request(:get, "_aliases")
  if existing.include? index_name
    raise "There is already an index called #{index_name}"
  end
  if existing.include? alias_name
    raise "There is already an index called #{alias_name}"
  end
end

#create_index(metadata, overrides) ⇒ Object



86
87
88
89
90
91
92
# File 'lib/es_dump_restore/es_client.rb', line 86

def create_index(, overrides)
  if overrides
    overrides = MultiJson.load(overrides)
     = deep_merge(, overrides)
  end
  request(:post, "#{@path_prefix}", :body => MultiJson.dump())
end

#each_scroll_hit(scroll_id, &block) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/es_dump_restore/es_client.rb', line 60

def each_scroll_hit(scroll_id, &block)
  done = 0
  loop do
    batch = request(:get, '_search/scroll', {
      query: {
        version: true,
        scroll: '10m',
        scroll_id: scroll_id
      }
    }, [404])

    batch_hits = batch["hits"]
    break if batch_hits.nil?
    hits = batch_hits["hits"]
    break if hits.empty?

    hits.each do |hit|
      yield hit
    end

    total = batch_hits["total"]
    done += hits.size
    break if done >= total
  end
end

#mappingsObject



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/es_dump_restore/es_client.rb', line 21

def mappings
  data = request(:get, "#{@path_prefix}/_mapping")
  if data.values.size != 1
    raise "Unexpected response: #{data}"
  end
  mappings = data.values.first
  if mappings["mappings"].nil?
    mappings
  else
    mappings["mappings"]
  end
end

#replace_alias_and_close_old_index(alias_name) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/es_dump_restore/es_client.rb', line 108

def replace_alias_and_close_old_index(alias_name)
  existing = request(:get, "_aliases")

  # Response of the form:
  #   { "index_name" => { "aliases" => { "a1" => {}, "a2" => {} } } }
  old_aliased_indices = existing.select { |name, details|
    details.fetch("aliases", {}).keys.include? alias_name
  }
  old_aliased_indices = old_aliased_indices.keys

  # For any existing indices with this alias, remove the alias
  # We would normally expect 0 or 1 such index, but several is
  # valid too
  actions = old_aliased_indices.map { |old_index_name|
    { "remove" => { "index" => old_index_name, "alias" => alias_name } }
  }

  actions << { "add" => { "index" => index_name, "alias" => alias_name } }

  request(:post, "_aliases", :body => MultiJson.dump({ "actions" => actions }))
  old_aliased_indices.each do |old_index_name|
    request(:post, "#{old_index_name}/_close")
  end
end

#settingsObject



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/es_dump_restore/es_client.rb', line 34

def settings
  data = request(:get, "#{@index_name}/_settings")
  if data.values.size != 1
    raise "Unexpected response: #{data}"
  end
  settings = data.values.first
  if settings["settings"].nil?
    settings
  else
    settings["settings"]
  end
end

#start_scan {|scroll_id, total| ... } ⇒ Object

Yields:

  • (scroll_id, total)


47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/es_dump_restore/es_client.rb', line 47

def start_scan(&block)
  scroll = request(:get, "#{@path_prefix}/_search",
    query: { search_type: 'scan', scroll: '10m', size: 500, version: true },
    body: MultiJson.dump({
      fields: ['_source', '_timestamp', '_routing', '_percolate', '_parent', '_ttl'],
      query: { match_all: {} } }
    ))
  total = scroll["hits"]["total"]
  scroll_id = scroll["_scroll_id"]

  yield scroll_id, total
end