Class: EsDumpRestore::EsClient
- Inherits:
-
Object
- Object
- EsDumpRestore::EsClient
- Defined in:
- lib/es_dump_restore/es_client.rb
Constant Summary collapse
- SLEEP_BETWEEN_RETRIES =
5
Instance Attribute Summary collapse
-
#base_uri ⇒ Object
Returns the value of attribute base_uri.
-
#index_name ⇒ Object
Returns the value of attribute index_name.
Instance Method Summary collapse
- #bulk_index(data) ⇒ Object
- #check_alias(alias_name) ⇒ Object
- #create_index(metadata, overrides) ⇒ Object
- #each_scroll_hit(scroll_id, &block) ⇒ Object
-
#initialize(base_uri, index_name, type, exception_retries = 1) ⇒ EsClient
constructor
A new instance of EsClient.
- #mappings ⇒ Object
- #replace_alias_and_close_old_index(alias_name) ⇒ Object
- #settings ⇒ Object
- #start_scan {|scroll_id, total| ... } ⇒ Object
Constructor Details
#initialize(base_uri, index_name, type, exception_retries = 1) ⇒ EsClient
Returns a new instance of EsClient.
12 13 14 15 16 17 18 19 |
# File 'lib/es_dump_restore/es_client.rb', line 12 def initialize(base_uri, index_name, type, exception_retries=1) @httpclient = HTTPClient.new @index_name = index_name @es_uri = base_uri @path_prefix = type.nil? ? index_name : index_name + "/" + type @exception_retries = exception_retries end |
Instance Attribute Details
#base_uri ⇒ Object
Returns the value of attribute base_uri.
9 10 11 |
# File 'lib/es_dump_restore/es_client.rb', line 9 def base_uri @base_uri end |
#index_name ⇒ Object
Returns the value of attribute index_name.
10 11 12 |
# File 'lib/es_dump_restore/es_client.rb', line 10 def index_name @index_name end |
Instance Method Details
#bulk_index(data) ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/es_dump_restore/es_client.rb', line 133 def bulk_index(data) retries = 0 begin request(:post, "#{@path_prefix}/_bulk", :body => data) rescue HTTPClient::TimeoutError => e if retries < @exception_retries retries += 1 puts "Retrying (#{retries} of #{@exception_retries}) '#{@path_prefix}/_bulk'" sleep SLEEP_BETWEEN_RETRIES # add a sleep here so we aren't hammering the server retry end raise e end end |
#check_alias(alias_name) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/es_dump_restore/es_client.rb', line 94 def check_alias(alias_name) # Checks that it's possible to do an atomic restore using the given alias # name. This requires that: # - `alias_name` doesn't point to an existing index # - `index_name` doesn't point to an existing index existing = request(:get, "_aliases") if existing.include? index_name raise "There is already an index called #{index_name}" end if existing.include? alias_name raise "There is already an index called #{alias_name}" end end |
#create_index(metadata, overrides) ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/es_dump_restore/es_client.rb', line 86 def create_index(, overrides) if overrides overrides = MultiJson.load(overrides) = deep_merge(, overrides) end request(:post, "#{@path_prefix}", :body => MultiJson.dump()) end |
#each_scroll_hit(scroll_id, &block) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/es_dump_restore/es_client.rb', line 60 def each_scroll_hit(scroll_id, &block) done = 0 loop do batch = request(:get, '_search/scroll', { query: { version: true, scroll: '10m', scroll_id: scroll_id } }, [404]) batch_hits = batch["hits"] break if batch_hits.nil? hits = batch_hits["hits"] break if hits.empty? hits.each do |hit| yield hit end total = batch_hits["total"] done += hits.size break if done >= total end end |
#mappings ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/es_dump_restore/es_client.rb', line 21 def mappings data = request(:get, "#{@path_prefix}/_mapping") if data.values.size != 1 raise "Unexpected response: #{data}" end mappings = data.values.first if mappings["mappings"].nil? mappings else mappings["mappings"] end end |
#replace_alias_and_close_old_index(alias_name) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/es_dump_restore/es_client.rb', line 108 def replace_alias_and_close_old_index(alias_name) existing = request(:get, "_aliases") # Response of the form: # { "index_name" => { "aliases" => { "a1" => {}, "a2" => {} } } } old_aliased_indices = existing.select { |name, details| details.fetch("aliases", {}).keys.include? alias_name } old_aliased_indices = old_aliased_indices.keys # For any existing indices with this alias, remove the alias # We would normally expect 0 or 1 such index, but several is # valid too actions = old_aliased_indices.map { |old_index_name| { "remove" => { "index" => old_index_name, "alias" => alias_name } } } actions << { "add" => { "index" => index_name, "alias" => alias_name } } request(:post, "_aliases", :body => MultiJson.dump({ "actions" => actions })) old_aliased_indices.each do |old_index_name| request(:post, "#{old_index_name}/_close") end end |
#settings ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/es_dump_restore/es_client.rb', line 34 def settings data = request(:get, "#{@index_name}/_settings") if data.values.size != 1 raise "Unexpected response: #{data}" end settings = data.values.first if settings["settings"].nil? settings else settings["settings"] end end |
#start_scan {|scroll_id, total| ... } ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/es_dump_restore/es_client.rb', line 47 def start_scan(&block) scroll = request(:get, "#{@path_prefix}/_search", query: { search_type: 'scan', scroll: '10m', size: 500, version: true }, body: MultiJson.dump({ fields: ['_source', '_timestamp', '_routing', '_percolate', '_parent', '_ttl'], query: { match_all: {} } } )) total = scroll["hits"]["total"] scroll_id = scroll["_scroll_id"] yield scroll_id, total end |