Class: ESReindex
- Inherits:
-
Object
- Object
- ESReindex
- Defined in:
- lib/es-reindex.rb,
lib/es-reindex/railtie.rb,
lib/es-reindex/version.rb,
lib/es-reindex/args_parser.rb
Defined Under Namespace
Classes: ArgsParser, Railtie
Constant Summary collapse
- DEFAULT_URL =
'http://10.203.175.32:9200'
- VERSION =
'0.3.1'
Class Attribute Summary collapse
-
.logger ⇒ Object
Returns the value of attribute logger.
Instance Attribute Summary collapse
-
#dclient ⇒ Object
Returns the value of attribute dclient.
-
#didx ⇒ Object
Returns the value of attribute didx.
-
#done ⇒ Object
Returns the value of attribute done.
-
#dst ⇒ Object
Returns the value of attribute dst.
-
#durl ⇒ Object
Returns the value of attribute durl.
-
#mappings ⇒ Object
Returns the value of attribute mappings.
-
#options ⇒ Object
Returns the value of attribute options.
-
#sclient ⇒ Object
Returns the value of attribute sclient.
-
#settings ⇒ Object
Returns the value of attribute settings.
-
#sidx ⇒ Object
Returns the value of attribute sidx.
-
#src ⇒ Object
Returns the value of attribute src.
-
#start_time ⇒ Object
Returns the value of attribute start_time.
-
#surl ⇒ Object
Returns the value of attribute surl.
Class Method Summary collapse
Instance Method Summary collapse
- #check_docs ⇒ Object
- #clear_destination ⇒ Object
- #confirm ⇒ Object
- #copy! ⇒ Object
- #copy_docs ⇒ Object
- #create_destination ⇒ Object
- #get_mappings ⇒ Object
- #get_settings ⇒ Object
-
#initialize(src, dst, options = {}) ⇒ ESReindex
constructor
A new instance of ESReindex.
- #okay_to_proceed? ⇒ Boolean
- #setup_index_urls ⇒ Object
Constructor Details
#initialize(src, dst, options = {}) ⇒ ESReindex
Returns a new instance of ESReindex.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/es-reindex.rb', line 26 def initialize(src, dst, = {}) ESReindex.logger ||= Logger.new(STDERR) @src = src || '' @dst = dst || '' @options = { from_cli: false, # Coming from CLI? remove: false, # remove the index in the new location first update: false, # update existing documents (default: only create non-existing) frame: 1000, # specify frame size to be obtained with one fetch during scrolling copy_mappings: true # Copy old mappings/settings }.merge! %w{ if unless mappings settings before_create after_create before_each after_each after_copy }.each do |callback| if [callback.to_sym].present? && ![callback.to_sym].respond_to?(:call) raise ArgumentError, "#{callback} must be a callable object" end end @done = 0 end |
Class Attribute Details
.logger ⇒ Object
Returns the value of attribute logger.
203 204 205 |
# File 'lib/es-reindex.rb', line 203 def logger @logger end |
Instance Attribute Details
#dclient ⇒ Object
Returns the value of attribute dclient.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def dclient @dclient end |
#didx ⇒ Object
Returns the value of attribute didx.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def didx @didx end |
#done ⇒ Object
Returns the value of attribute done.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def done @done end |
#dst ⇒ Object
Returns the value of attribute dst.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def dst @dst end |
#durl ⇒ Object
Returns the value of attribute durl.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def durl @durl end |
#mappings ⇒ Object
Returns the value of attribute mappings.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def mappings @mappings end |
#options ⇒ Object
Returns the value of attribute options.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def @options end |
#sclient ⇒ Object
Returns the value of attribute sclient.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def sclient @sclient end |
#settings ⇒ Object
Returns the value of attribute settings.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def settings @settings end |
#sidx ⇒ Object
Returns the value of attribute sidx.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def sidx @sidx end |
#src ⇒ Object
Returns the value of attribute src.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def src @src end |
#start_time ⇒ Object
Returns the value of attribute start_time.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def start_time @start_time end |
#surl ⇒ Object
Returns the value of attribute surl.
10 11 12 |
# File 'lib/es-reindex.rb', line 10 def surl @surl end |
Class Method Details
.copy!(src, dst, options = {}) ⇒ Object
12 13 14 15 16 17 |
# File 'lib/es-reindex.rb', line 12 def self.copy!(src, dst, = {}) self.new(src, dst, ).tap do |reindexer| reindexer.setup_index_urls reindexer.copy! if reindexer.okay_to_proceed? end end |
.reindex!(src, dst, options = {}) ⇒ Object
19 20 21 22 23 24 |
# File 'lib/es-reindex.rb', line 19 def self.reindex!(src, dst, ={}) self.new(src, dst, .merge(copy_mappings: false)).tap do |reindexer| reindexer.setup_index_urls reindexer.copy! if reindexer.okay_to_proceed? end end |
Instance Method Details
#check_docs ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/es-reindex.rb', line 183 def check_docs log 'Checking document count... ' scount, dcount = 1, 0 begin Timeout::timeout(60) do while true scount = sclient.count(index: sidx)["count"] dcount = dclient.count(index: didx)["count"] break if scount == dcount sleep 1 end end rescue Timeout::Error end log "Document count: #{scount} = #{dcount} (#{scount == dcount ? 'equal' : 'NOT EQUAL'})" scount == dcount end |
#clear_destination ⇒ Object
97 98 99 100 101 102 |
# File 'lib/es-reindex.rb', line 97 def clear_destination dclient.indices.delete(index: didx) if remove? && dclient.indices.exists(index: didx) true rescue => e false end |
#confirm ⇒ Object
92 93 94 95 |
# File 'lib/es-reindex.rb', line 92 def confirm printf "Confirm or hit Ctrl-c to abort...\n" $stdin.readline end |
#copy! ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/es-reindex.rb', line 74 def copy! log "Copying '#{surl}/#{sidx}' to '#{durl}/#{didx}'#{remove? ? ' with rewriting destination mapping!' : update? ? ' with updating existing documents!' : '.'}" confirm if from_cli? success = ( clear_destination && create_destination && copy_docs && check_docs ) if from_cli? exit (success ? 0 : 1) else success end end |
#copy_docs ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/es-reindex.rb', line 146 def copy_docs log "Copying '#{surl}/#{sidx}' to '#{durl}/#{didx}'..." @start_time = Time.now scroll = sclient.search index: sidx, search_type: "scan", scroll: '10m', size: frame scroll_id = scroll['_scroll_id'] total = scroll['hits']['total'] log "Copy progress: %u/%u (%.1f%%) done.\r" % [done, total, 0] action = update? ? 'index' : 'create' while scroll = sclient.scroll(scroll_id: scroll['_scroll_id'], scroll: '10m') and not scroll['hits']['hits'].empty? do bulk = [] scroll['hits']['hits'].each do |doc| [:before_each].try(:call) ### === implement possible modifications to the document ### === end modifications to the document base = {'_index' => didx, '_id' => doc['_id'], '_type' => doc['_type'], data: doc['_source']} bulk << {action => base} @done = done + 1 [:after_each].try(:call) end unless bulk.empty? dclient.bulk body: bulk end eta = total * (Time.now - start_time) / done log "Copy progress: #{done}/#{total} (%.1f%%) done in #{tm_len}. E.T.A.: #{start_time + eta}." % (100.0 * done / total) end log "Copy progress: %u/%u done in %s.\n" % [done, total, tm_len] [:after_copy].try(:call) true end |
#create_destination ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/es-reindex.rb', line 104 def create_destination unless dclient.indices.exists index: didx if copy_mappings? return false unless get_settings return false unless get_mappings create_msg = " with settings & mappings from '#{surl}/#{sidx}'" else @mappings = [:mappings].call @settings = [:settings].call create_msg = "" end [:before_create].try(:call) log "Creating '#{durl}/#{didx}' index#{create_msg}..." dclient.indices.create index: didx, body: { settings: settings, mappings: mappings } log "Succesfully created '#{durl}/#{didx}''#{create_msg}." [:after_create].try(:call) end true end |
#get_mappings ⇒ Object
138 139 140 141 142 143 144 |
# File 'lib/es-reindex.rb', line 138 def get_mappings unless mappings = sclient.indices.get_mapping(index: sidx) log "Failed to obtain original index '#{surl}/#{sidx}' mappings!", :error return false end @mappings = mappings[sidx]["mappings"] end |
#get_settings ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/es-reindex.rb', line 128 def get_settings unless settings = sclient.indices.get_settings(index: sidx) log "Failed to obtain original index '#{surl}/#{sidx}' settings!" return false end @settings = settings[sidx]["settings"] @settings["index"]["version"].delete "created" end |
#okay_to_proceed? ⇒ Boolean
66 67 68 69 70 71 72 |
# File 'lib/es-reindex.rb', line 66 def okay_to_proceed? okay = true okay = [:if].call(sclient, dclient) if [:if].present? okay = (okay && !([:unless].call sclient, dclient)) if [:unless].present? log 'Skipping action due to guard callbacks' unless okay okay end |
#setup_index_urls ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/es-reindex.rb', line 50 def setup_index_urls @surl, @durl, @sidx, @didx = '', '', '', '' [[src, surl, sidx], [dst, durl, didx]].each do |param, url, idx| if param =~ %r{^(.*)/(.*?)$} url.replace $1 idx.replace $2 else url.replace DEFAULT_URL idx.replace param end end @sclient = Elasticsearch::Client.new host: surl @dclient = Elasticsearch::Client.new host: durl end |