Class: InstDataShipper::Dumper
- Inherits:
-
Object
- Object
- InstDataShipper::Dumper
- Includes:
- Hooks
- Defined in:
- lib/inst_data_shipper/dumper.rb
Direct Known Subclasses
Class Method Summary collapse
- .current(executor: nil) ⇒ Object
- .define(include: [], schema:, &blk) ⇒ Object
- .perform_dump(destinations, force_full_tables: nil) ⇒ Object
Instance Method Summary collapse
- #begin_dump(force_full_tables: nil) ⇒ Object
- #export_genre ⇒ Object
- #for_specs! ⇒ Object
- #incremental_since ⇒ Object
- #last_successful_tracker ⇒ Object
- #lookup_table_schema(*identifiers) ⇒ Object
- #lookup_table_schema!(*identifiers) ⇒ Object
- #origin_class ⇒ Object
- #schema ⇒ Object
- #schema_digest ⇒ Object
- #table_is_incremental?(table_def) ⇒ Boolean
- #table_schema_compatible?(table_def, meta_hash) ⇒ Boolean
- #table_schema_hash(table_def) ⇒ Object
- #table_schema_metadata(table_def) ⇒ Object
- #tracker ⇒ Object
Methods included from Hooks
Class Method Details
.current(executor: nil) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/inst_data_shipper/dumper.rb', line 32 def self.current(executor: nil) cur_batch = Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY] ctx = cur_batch&.context || {} return nil unless ctx[:origin_class].present? && ctx[:tracker_id].present? clazz = ctx[:origin_class] clazz = clazz.constantize if clazz.is_a?(String) clazz.new(executor: executor) end |
.define(include: [], schema:, &blk) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/inst_data_shipper/dumper.rb', line 17 def self.define(include: [], schema: , &blk) Class.new(self) do include(*include) if include.present? if blk.nil? && schema[:tables].any? { |t| t[:sourcer].present? } blk = -> { auto_enqueue_from_schema } elsif blk.nil? raise ArgumentError, "Must provide a block or a schema with source definitions" end define_method(:enqueue_tasks, &blk) define_method(:schema) { schema } end end |
.perform_dump(destinations, force_full_tables: nil) ⇒ Object
8 9 10 11 12 13 14 15 |
# File 'lib/inst_data_shipper/dumper.rb', line 8 def self.perform_dump(destinations, force_full_tables: nil) raise "Must subclass Dumper to use perform_dump" if self == Dumper dumper = new(destinations) dumper.begin_dump(force_full_tables: force_full_tables) dumper.tracker end |
Instance Method Details
#begin_dump(force_full_tables: nil) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/inst_data_shipper/dumper.rb', line 54 def begin_dump(force_full_tables: nil) raise "Dump already begun" unless @raw_destinations.present? @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, genre: export_genre, status: 'in_progress') @batch_context = context = { # TODO Consider behavior if last is still running incremental_since: last_successful_tracker&.created_at, force_full_tables: force_full_tables || [], } destinations.each do |dest| dest.preinitialize_dump(context) end begin begin destinations.each do |dest| dest.initialize_dump(context) end run_hook(:initialize_dump_batch, context) ensure @batch_context = nil context[:tracker_id] = tracker.id context[:origin_class] = batch_context[:origin_class] || self.class.to_s context[:destinations] = @raw_destinations end context.delete(:force_full_tables) if context[:force_full_tables].empty? Sidekiq::Batch.new.tap do |batch| context[:root_bid] = batch.bid tracker.update(batch_id: batch.bid) batch.description = "HD #{export_genre} Export #{tracker.id} Root" batch.context = context batch.on(:success, "#{self.class}#finalize_dump") batch.on(:death, "#{self.class}#cleanup_fatal_error!") batch.jobs do enqueue_tasks rescue => ex delayed :cleanup_fatal_error! InstDataShipper.handle_suppressed_error(ex) tracker.update(status: 'failed', exception: ex., backtrace: ex.backtrace.join("\n")) end end rescue => ex if context batch ||= Sidekiq::Batch.new.tap do |batch| batch.description = "HD #{export_genre} Export #{tracker.id} Early Failure Cleanup" batch.context = context batch.jobs do delayed :cleanup_fatal_error! end end end tracker.update(status: 'failed', exception: ex., backtrace: ex.backtrace.join("\n")) raise ex end end |
#export_genre ⇒ Object
124 125 126 |
# File 'lib/inst_data_shipper/dumper.rb', line 124 def export_genre self.class.to_s end |
#for_specs! ⇒ Object
43 44 45 46 47 48 49 |
# File 'lib/inst_data_shipper/dumper.rb', line 43 def for_specs! @raw_destinations = ["speccable://nil"] @executor = InstDataShipper::Jobs::AsyncCaller.new @tracker = DumpBatch.new(job_class: self.class.to_s, genre: export_genre, status: 'in_progress') define_singleton_method(:spec_destination) { destinations.first } self end |
#incremental_since ⇒ Object
190 191 192 |
# File 'lib/inst_data_shipper/dumper.rb', line 190 def incremental_since batch_context[:incremental_since] end |
#last_successful_tracker ⇒ Object
120 121 122 |
# File 'lib/inst_data_shipper/dumper.rb', line 120 def last_successful_tracker @last_successful_tracker ||= DumpBatch.where(job_class: self.class.to_s, genre: export_genre, status: 'completed').order(created_at: :desc).first end |
#lookup_table_schema(*identifiers) ⇒ Object
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/inst_data_shipper/dumper.rb', line 194 def lookup_table_schema(*identifiers) identifiers.compact.each do |ident| if ident.is_a?(Hash) key = ident.keys.first value = ident.values.first else key = :warehouse_name value = ident end value = Array(value).compact schema[:tables].each do |ts| return ts if value.include?(ts[key]) end end nil end |
#lookup_table_schema!(*identifiers) ⇒ Object
214 215 216 |
# File 'lib/inst_data_shipper/dumper.rb', line 214 def lookup_table_schema!(*identifiers) lookup_table_schema(*identifiers) || raise("No table schema found for #{identifiers.inspect}") end |
#origin_class ⇒ Object
128 129 130 |
# File 'lib/inst_data_shipper/dumper.rb', line 128 def origin_class batch_context[:origin_class]&.constantize || self.class end |
#schema ⇒ Object
132 133 134 135 |
# File 'lib/inst_data_shipper/dumper.rb', line 132 def schema return origin_class::SCHEMA if defined?(origin_class::SCHEMA) raise NotImplementedError end |
#schema_digest ⇒ Object
137 138 139 |
# File 'lib/inst_data_shipper/dumper.rb', line 137 def schema_digest Digest::MD5.hexdigest(schema.to_json)[0...8] end |
#table_is_incremental?(table_def) ⇒ Boolean
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/inst_data_shipper/dumper.rb', line 170 def table_is_incremental?(table_def) return false unless incremental_since.present? table_def = lookup_table_schema!(table_def) if table_def.is_a?(String) return false if batch_context[:force_full_tables]&.include?(table_def[:warehouse_name]) # TODO Return false if table's schema changes if (inc = table_def[:incremental]).present? differ = inc[:if] return !!incremental_since if differ.nil? differ = :"#{differ}".to_proc if differ.is_a?(Symbol) differ = instance_exec(&differ) if differ.is_a?(Proc) return !!differ end false end |
#table_schema_compatible?(table_def, meta_hash) ⇒ Boolean
152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/inst_data_shipper/dumper.rb', line 152 def table_schema_compatible?(table_def, ) # Force full-table-upload if: # - The table is not present in the last dump return false unless # - The table's explicitly-set versions do not match return false if [:table_schema_version] != table_def[:version] # - The table does not have an explicitly-set version and the schema hash does not match return false if !table_def[:version].present? && [:table_schema_hash] != table_schema_hash(table_def) true end |
#table_schema_hash(table_def) ⇒ Object
166 167 168 |
# File 'lib/inst_data_shipper/dumper.rb', line 166 def table_schema_hash(table_def) Digest::MD5.hexdigest(table_def.to_json)[0...8] end |
#table_schema_metadata(table_def) ⇒ Object
141 142 143 144 145 146 147 148 149 150 |
# File 'lib/inst_data_shipper/dumper.rb', line 141 def (table_def) = { table_warehouse_name: table_def[:warehouse_name], table_schema_hash: table_schema_hash(table_def), } [:table_schema_version] = table_def[:version] if table_def[:version].present? end |
#tracker ⇒ Object
116 117 118 |
# File 'lib/inst_data_shipper/dumper.rb', line 116 def tracker @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil end |