Class: Chewy::Index::Syncer
- Inherits:
-
Object
- Object
- Chewy::Index::Syncer
- Defined in:
- lib/chewy/index/syncer.rb
Overview
In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there.
ATTENTION: synchronization may be slow in case when synchronized tables
are missing compound index on primary key and outdated_sync_field
.
This class is able to find missing and outdated documents in the ES
comparing ids from the data source and the ES index. Also, if outdated_sync_field
exists in the index definition, it performs comparison of this field
values for each source object and corresponding ES document. Usually,
this field is updated_at
and if its value in the source is not equal
to the value in the index - this means that this document outdated and
should be reindexed.
To fetch necessary data from the source it uses adapter method Adapter::Base#import_fields, in case when the Object adapter is used it makes sense to read corresponding documentation.
If parallel
option is passed to the initializer - it will fetch source and
index data in parallel and then perform outdated objects calculation in
parallel processes. Also, further import (if required) will be performed
in parallel as well.
Constant Summary collapse
- DEFAULT_SYNC_BATCH_SIZE =
20_000
- ISO_DATETIME =
/\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
- OUTDATED_IDS_WORKER =
lambda do |outdated_sync_field_type, source_data_hash, index, total, index_data| ::Process.setproctitle("chewy [#{index}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if index index_data.each_with_object([]) do |(id, index_sync_value), result| next unless source_data_hash[id] outdated = if outdated_sync_field_type == 'date' !Chewy::Index::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value)) else source_data_hash[id] != index_sync_value end result.push(id) if outdated end end
- SOURCE_OR_INDEX_DATA_WORKER =
lambda do |syncer, index, kind| ::Process.setproctitle("chewy [#{index}]: sync fetching data (#{kind})") result = case kind when :source syncer.send(:fetch_source_data) when :index syncer.send(:fetch_index_data) end {kind => result} end
Class Method Summary collapse
-
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
- .typecast_date(string) ⇒ Object
Instance Method Summary collapse
-
#initialize(index, parallel: nil) ⇒ Syncer
constructor
A new instance of Syncer.
-
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
-
#outdated_ids ⇒ Array<String>
If index supports outdated sync, it compares the values of the
outdated_sync_field
for each object and document in the source and index and returns the ids of entities which differ. -
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
Constructor Details
#initialize(index, parallel: nil) ⇒ Syncer
Returns a new instance of Syncer.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/chewy/index/syncer.rb', line 76 def initialize(index, parallel: nil) @index = index @parallel = if !parallel || parallel.is_a?(Hash) parallel elsif parallel.is_a?(Integer) {in_processes: parallel} else {} end end |
Class Method Details
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
70 71 72 |
# File 'lib/chewy/index/syncer.rb', line 70 def self.dates_equal(one, two) [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000] end |
.typecast_date(string) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/chewy/index/syncer.rb', line 56 def self.typecast_date(string) if string.is_a?(String) && (match = ISO_DATETIME.match(string)) microsec = (match[7].to_r * 1_000_000).to_i day = "#{match[1]}-#{match[2]}-#{match[3]}" time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}" microseconds = format('%06d', microsec) date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00" Time.iso8601(date) else string end end |
Instance Method Details
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/chewy/index/syncer.rb', line 101 def missing_ids return [] if source_data.blank? @missing_ids ||= begin source_data_ids = data_ids(source_data) index_data_ids = data_ids(index_data) (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids) end end |
#outdated_ids ⇒ Array<String>
If index supports outdated sync, it compares the values of the
outdated_sync_field
for each object and document in the source
and index and returns the ids of entities which differ.
118 119 120 121 122 123 124 125 126 |
# File 'lib/chewy/index/syncer.rb', line 118 def outdated_ids return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync? @outdated_ids ||= if @parallel parallel_outdated_ids else linear_outdated_ids end end |
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
90 91 92 93 94 95 |
# File 'lib/chewy/index/syncer.rb', line 90 def perform ids = missing_ids | outdated_ids return 0 if ids.blank? @index.import(ids, parallel: @parallel) && ids.count end |