Module: Chewy::RakeHelper
- Defined in:
- lib/chewy/rake_helper.rb
Constant Summary collapse
- IMPORT_CALLBACK =
lambda do |output, _name, start, finish, _id, payload| duration = (finish - start).ceil stats = payload.fetch(:import, {}).map { |key, count| "#{key} #{count}" }.join(', ') output.puts " Imported #{payload[:index]} in #{human_duration(duration)}, stats: #{stats}" payload[:errors]&.each do |action, errors| output.puts " #{action.to_s.humanize} errors:" errors.each do |error, documents| output.puts " `#{error}`" output.puts " on #{documents.count} documents: #{documents}" end end end
- JOURNAL_CALLBACK =
lambda do |output, _, _, _, _, payload| count = payload[:groups].values.map(&:size).sum targets = payload[:groups].keys.sort_by(&:derivable_name) output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}" end
- DELETE_BY_QUERY_OPTIONS =
%w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze
- FALSE_VALUES =
%w[0 f false off].freeze
Class Method Summary collapse
-
.all_indexes ⇒ Array<Chewy::Index>
Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal.
- .create_missing_indexes!(output: $stdout, env: ENV) ⇒ Object
-
.delete_by_query_options_from_env(env) ⇒ Object
Reads options that are required to run journal cleanup asynchronously from ENV hash.
-
.journal_apply(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Applies changes that were done after the specified time for the specified indexes or all of them.
-
.journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) ⇒ Array<Chewy::Index>
Removes journal records created before the specified timestamp for the specified indexes or all of them.
-
.journal_create(output: $stdout) ⇒ Object
Creates journal index.
- .normalize_index(identifier) ⇒ Object
- .normalize_indexes(*identifiers) ⇒ Object
-
.reindex(source:, dest:, output: $stdout) ⇒ Object
Reindex data from source index to destination index.
-
.reset(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes.
- .subscribed_task_stats(output = $stdout, &block) ⇒ Object
-
.sync(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs synchronization for each passed index if it exists.
-
.update(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs full update for each passed type if the corresponding index exists.
-
.update_mapping(name:, output: $stdout) ⇒ Object
Adds new fields to an existing data stream or index.
-
.upgrade(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed.
Class Method Details
.all_indexes ⇒ Array<Chewy::Index>
Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal.
217 218 219 220 |
# File 'lib/chewy/rake_helper.rb', line 217 def all_indexes Chewy.eager_load! Chewy::Index.descendants - [Chewy::Stash::Journal, Chewy::Stash::Specification] end |
.create_missing_indexes!(output: $stdout, env: ENV) ⇒ Object
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/chewy/rake_helper.rb', line 271 def create_missing_indexes!(output: $stdout, env: ENV) subscribed_task_stats(output) do Chewy.eager_load! all_indexes = Chewy::Index.descendants all_indexes -= [Chewy::Stash::Journal] unless Chewy.configuration[:journal] all_indexes.each do |index| if index.exists? output.puts "#{index.name} already exists, skipping" if env['VERBOSE'] next end index.create! output.puts "#{index.name} index successfully created" end end end |
.delete_by_query_options_from_env(env) ⇒ Object
Reads options that are required to run journal cleanup asynchronously from ENV hash
258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/chewy/rake_helper.rb', line 258 def (env) env .slice(*DELETE_BY_QUERY_OPTIONS) .transform_keys { |k| k.downcase.to_sym } .to_h do |key, value| case key when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)] when :requests_per_second then [key, value.to_f] when :scroll_size then [key, value.to_i] end end end |
.journal_apply(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Applies changes that were done after the specified time for the specified indexes or all of them.
163 164 165 166 167 168 169 170 171 |
# File 'lib/chewy/rake_helper.rb', line 163 def journal_apply(time: nil, only: nil, except: nil, output: $stdout) raise ArgumentError, 'Please specify the time to start with' unless time subscribed_task_stats(output) do output.puts "Applying journal entries created after #{time}" count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time) output.puts 'No journal entries were created after the specified time' if count.zero? end end |
.journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) ⇒ Array<Chewy::Index>
Removes journal records created before the specified timestamp for the specified indexes or all of them.
187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/chewy/rake_helper.rb', line 187 def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) subscribed_task_stats(output) do output.puts "Cleaning journal entries created before #{time}" if time response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: ) if response.key?('task') output.puts "Task to cleanup the journal has been created, #{response['task']}" else count = response['deleted'] || response['_indices']['_all']['deleted'] output.puts "Cleaned up #{count} journal entries" end end end |
.journal_create(output: $stdout) ⇒ Object
Creates journal index.
207 208 209 210 211 |
# File 'lib/chewy/rake_helper.rb', line 207 def journal_create(output: $stdout) subscribed_task_stats(output) do Chewy::Stash::Journal.create! end end |
.normalize_index(identifier) ⇒ Object
293 294 295 296 297 |
# File 'lib/chewy/rake_helper.rb', line 293 def normalize_index(identifier) return identifier if identifier.is_a?(Class) && identifier < Chewy::Index "#{identifier.to_s.camelize}Index".constantize end |
.normalize_indexes(*identifiers) ⇒ Object
289 290 291 |
# File 'lib/chewy/rake_helper.rb', line 289 def normalize_indexes(*identifiers) identifiers.flatten(1).map { |identifier| normalize_index(identifier) } end |
.reindex(source:, dest:, output: $stdout) ⇒ Object
Reindex data from source index to destination index
228 229 230 231 232 233 234 |
# File 'lib/chewy/rake_helper.rb', line 228 def reindex(source:, dest:, output: $stdout) subscribed_task_stats(output) do output.puts "Source index is #{source}\nDestination index is #{dest}" Chewy::Index.reindex(source: source, dest: dest) output.puts "#{source} index successfully reindexed with #{dest} index data" end end |
.reset(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes
40 41 42 43 44 45 46 47 48 |
# File 'lib/chewy/rake_helper.rb', line 40 def reset(only: nil, except: nil, parallel: nil, output: $stdout) warn_missing_index(output) subscribed_task_stats(output) do indexes_from(only: only, except: except).each do |index| reset_one(index, output, parallel: parallel) end end end |
.subscribed_task_stats(output = $stdout, &block) ⇒ Object
299 300 301 302 303 304 305 306 |
# File 'lib/chewy/rake_helper.rb', line 299 def subscribed_task_stats(output = $stdout, &block) start = Time.now ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do ActiveSupport::Notifications.subscribed(IMPORT_CALLBACK.curry[output], 'import_objects.chewy', &block) end ensure output.puts "Total: #{human_duration(Time.now - start)}" end |
.sync(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs synchronization for each passed index if it exists.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/chewy/rake_helper.rb', line 129 def sync(only: nil, except: nil, parallel: nil, output: $stdout) subscribed_task_stats(output) do indexes_from(only: only, except: except).each_with_object([]) do |index, synced_indexes| output.puts "Synchronizing #{index}" output.puts " #{index} doesn't support outdated synchronization" unless index.supports_outdated_sync? time = Time.now sync_result = index.sync(parallel: parallel) if !sync_result output.puts " Something went wrong with the #{index} synchronization" elsif (sync_result[:count]).positive? output.puts " Missing documents: #{sync_result[:missing]}" if sync_result[:missing].present? output.puts " Outdated documents: #{sync_result[:outdated]}" if sync_result[:outdated].present? synced_indexes.push(index) else output.puts " Skipping #{index}, up to date" end output.puts " Took #{human_duration(Time.now - time)}" end end end |
.update(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs full update for each passed type if the corresponding index exists.
103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/chewy/rake_helper.rb', line 103 def update(only: nil, except: nil, parallel: nil, output: $stdout) subscribed_task_stats(output) do indexes_from(only: only, except: except).each_with_object([]) do |index, updated_indexes| if index.exists? output.puts "Updating #{index}" index.import(parallel: parallel) updated_indexes.push(index) else output.puts "Skipping #{index}, it does not exists (use rake chewy:reset[#{index.derivable_name}] to create and update it)" end end end end |
.update_mapping(name:, output: $stdout) ⇒ Object
Adds new fields to an existing data stream or index. Change the search settings of existing fields.
243 244 245 246 247 248 249 |
# File 'lib/chewy/rake_helper.rb', line 243 def update_mapping(name:, output: $stdout) subscribed_task_stats(output) do output.puts "Index name is #{name}" normalize_index(name).update_mapping output.puts "#{name} index successfully updated" end end |
.upgrade(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/chewy/rake_helper.rb', line 65 def upgrade(only: nil, except: nil, parallel: nil, output: $stdout) warn_missing_index(output) subscribed_task_stats(output) do indexes = indexes_from(only: only, except: except) changed_indexes = indexes.select do |index| index.specification.changed? end if changed_indexes.present? indexes.each do |index| if changed_indexes.include?(index) reset_one(index, output, parallel: parallel) else output.puts "Skipping #{index}, the specification didn't change" end end else output.puts 'No index specification was changed' end changed_indexes end end |