Module: Chewy::RakeHelper
- Defined in:
- lib/chewy/rake_helper.rb
Constant Summary collapse
- IMPORT_CALLBACK =
lambda do |output, _name, start, finish, _id, payload| duration = (finish - start).ceil stats = payload.fetch(:import, {}).map { |key, count| "#{key} #{count}" }.join(', ') output.puts " Imported #{payload[:index]} in #{human_duration(duration)}, stats: #{stats}" payload[:errors]&.each do |action, errors| output.puts " #{action.to_s.humanize} errors:" errors.each do |error, documents| output.puts " `#{error}`" output.puts " on #{documents.count} documents: #{documents}" end end end
- JOURNAL_CALLBACK =
lambda do |output, _, _, _, _, payload| count = payload[:groups].values.map(&:size).sum targets = payload[:groups].keys.sort_by(&:derivable_name) output.puts " Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}" end
Class Method Summary collapse
-
.all_indexes ⇒ Array<Chewy::Index>
Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal.
-
.journal_apply(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Applies changes that were done after the specified time for the specified indexes or all of them.
-
.journal_clean(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Removes journal records created before the specified timestamp for the specified indexes or all of them.
- .normalize_index(identifier) ⇒ Object
- .normalize_indexes(*identifiers) ⇒ Object
-
.reindex(source:, dest:, output: $stdout) ⇒ Object
Reindex data from source index to destination index.
-
.reset(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes.
- .subscribed_task_stats(output = $stdout, &block) ⇒ Object
-
.sync(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs synchronization for each passed index if it exists.
-
.update(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs full update for each passed type if the corresponding index exists.
-
.update_mapping(name:, output: $stdout) ⇒ Object
Adds new fields to an existing data stream or index.
-
.upgrade(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed.
Class Method Details
.all_indexes ⇒ Array<Chewy::Index>
Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal.
197 198 199 200 |
# File 'lib/chewy/rake_helper.rb', line 197 def all_indexes Chewy.eager_load! Chewy::Index.descendants - [Chewy::Stash::Journal, Chewy::Stash::Specification] end |
.journal_apply(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Applies changes that were done after the specified time for the specified indexes or all of them.
160 161 162 163 164 165 166 167 168 |
# File 'lib/chewy/rake_helper.rb', line 160 def journal_apply(time: nil, only: nil, except: nil, output: $stdout) raise ArgumentError, 'Please specify the time to start with' unless time subscribed_task_stats(output) do output.puts "Applying journal entries created after #{time}" count = Chewy::Journal.new(indexes_from(only: only, except: except)).apply(time) output.puts 'No journal entries were created after the specified time' if count.zero? end end |
.journal_clean(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>
Removes journal records created before the specified timestamp for the specified indexes or all of them.
184 185 186 187 188 189 190 191 |
# File 'lib/chewy/rake_helper.rb', line 184 def journal_clean(time: nil, only: nil, except: nil, output: $stdout) subscribed_task_stats(output) do output.puts "Cleaning journal entries created before #{time}" if time response = Chewy::Journal.new(indexes_from(only: only, except: except)).clean(time) count = response['deleted'] || response['_indices']['_all']['deleted'] output.puts "Cleaned up #{count} journal entries" end end |
.normalize_index(identifier) ⇒ Object
235 236 237 238 239 |
# File 'lib/chewy/rake_helper.rb', line 235 def normalize_index(identifier) return identifier if identifier.is_a?(Class) && identifier < Chewy::Index "#{identifier.to_s.camelize}Index".constantize end |
.normalize_indexes(*identifiers) ⇒ Object
231 232 233 |
# File 'lib/chewy/rake_helper.rb', line 231 def normalize_indexes(*identifiers) identifiers.flatten(1).map { |identifier| normalize_index(identifier) } end |
.reindex(source:, dest:, output: $stdout) ⇒ Object
Reindex data from source index to destination index
208 209 210 211 212 213 214 |
# File 'lib/chewy/rake_helper.rb', line 208 def reindex(source:, dest:, output: $stdout) subscribed_task_stats(output) do output.puts "Source index is #{source}\nDestination index is #{dest}" Chewy::Index.reindex(source: source, dest: dest) output.puts "#{source} index successfully reindexed with #{dest} index data" end end |
.reset(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes
37 38 39 40 41 42 43 44 45 |
# File 'lib/chewy/rake_helper.rb', line 37 def reset(only: nil, except: nil, parallel: nil, output: $stdout) warn_missing_index(output) subscribed_task_stats(output) do indexes_from(only: only, except: except).each do |index| reset_one(index, output, parallel: parallel) end end end |
.subscribed_task_stats(output = $stdout, &block) ⇒ Object
241 242 243 244 245 246 247 |
# File 'lib/chewy/rake_helper.rb', line 241 def subscribed_task_stats(output = $stdout, &block) start = Time.now ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do ActiveSupport::Notifications.subscribed(IMPORT_CALLBACK.curry[output], 'import_objects.chewy', &block) end output.puts "Total: #{human_duration(Time.now - start)}" end |
.sync(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs synchronization for each passed index if it exists.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/chewy/rake_helper.rb', line 126 def sync(only: nil, except: nil, parallel: nil, output: $stdout) subscribed_task_stats(output) do indexes_from(only: only, except: except).each_with_object([]) do |index, synced_indexes| output.puts "Synchronizing #{index}" output.puts " #{index} doesn't support outdated synchronization" unless index.supports_outdated_sync? time = Time.now sync_result = index.sync(parallel: parallel) if !sync_result output.puts " Something went wrong with the #{index} synchronization" elsif (sync_result[:count]).positive? output.puts " Missing documents: #{sync_result[:missing]}" if sync_result[:missing].present? output.puts " Outdated documents: #{sync_result[:outdated]}" if sync_result[:outdated].present? synced_indexes.push(index) else output.puts " Skipping #{index}, up to date" end output.puts " Took #{human_duration(Time.now - time)}" end end end |
.update(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs full update for each passed type if the corresponding index exists.
100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/chewy/rake_helper.rb', line 100 def update(only: nil, except: nil, parallel: nil, output: $stdout) subscribed_task_stats(output) do indexes_from(only: only, except: except).each_with_object([]) do |index, updated_indexes| if index.exists? output.puts "Updating #{index}" index.import(parallel: parallel) updated_indexes.push(index) else output.puts "Skipping #{index}, it does not exists (use rake chewy:reset[#{index.derivable_name}] to create and update it)" end end end end |
.update_mapping(name:, output: $stdout) ⇒ Object
Adds new fields to an existing data stream or index. Change the search settings of existing fields.
223 224 225 226 227 228 229 |
# File 'lib/chewy/rake_helper.rb', line 223 def update_mapping(name:, output: $stdout) subscribed_task_stats(output) do output.puts "Index name is #{name}" normalize_index(name).update_mapping output.puts "#{name} index successfully updated" end end |
.upgrade(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>
Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/chewy/rake_helper.rb', line 62 def upgrade(only: nil, except: nil, parallel: nil, output: $stdout) warn_missing_index(output) subscribed_task_stats(output) do indexes = indexes_from(only: only, except: except) changed_indexes = indexes.select do |index| index.specification.changed? end if changed_indexes.present? indexes.each do |index| if changed_indexes.include?(index) reset_one(index, output, parallel: parallel) else output.puts "Skipping #{index}, the specification didn't change" end end else output.puts 'No index specification was changed' end changed_indexes end end |