Class: InstDataShipper::Dumper

Inherits:
Object
  • Object
show all
Includes:
Hooks
Defined in:
lib/inst_data_shipper/dumper.rb

Direct Known Subclasses

BasicDumper

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Hooks

#run_hook, #run_hook_safe

Class Method Details

.current(executor: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/inst_data_shipper/dumper.rb', line 32

def self.current(executor: nil)
  cur_batch = Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY]
  ctx = cur_batch&.context || {}
  return nil unless ctx[:origin_class].present? && ctx[:tracker_id].present?

  clazz = ctx[:origin_class]
  clazz = clazz.constantize if clazz.is_a?(String)
  clazz.new(executor: executor)
end

.define(include: [], schema:, &blk) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/inst_data_shipper/dumper.rb', line 17

def self.define(include: [], schema: , &blk)
  Class.new(self) do
    include(*include) if include.present?

    if blk.nil? && schema[:tables].any? { |t| t[:sourcer].present? }
      blk = -> { auto_enqueue_from_schema }
    elsif blk.nil?
      raise ArgumentError, "Must provide a block or a schema with source definitions"
    end

    define_method(:enqueue_tasks, &blk)
    define_method(:schema) { schema }
  end
end

.perform_dump(destinations, force_full_tables: nil) ⇒ Object



8
9
10
11
12
13
14
15
# File 'lib/inst_data_shipper/dumper.rb', line 8

def self.perform_dump(destinations, force_full_tables: nil)
  raise "Must subclass Dumper to use perform_dump" if self == Dumper

  dumper = new(destinations)
  dumper.begin_dump(force_full_tables: force_full_tables)

  dumper.tracker
end

Instance Method Details

#begin_dump(force_full_tables: nil) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/inst_data_shipper/dumper.rb', line 54

def begin_dump(force_full_tables: nil)
  raise "Dump already begun" unless @raw_destinations.present?

  @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, genre: export_genre, status: 'in_progress')

  @batch_context = context = {
    # TODO Consider behavior if last is still running
    incremental_since: last_successful_tracker&.created_at,
    force_full_tables: force_full_tables || [],
  }

  destinations.each do |dest|
    dest.preinitialize_dump(context)
  end

  begin
    begin
      destinations.each do |dest|
        dest.initialize_dump(context)
      end

      run_hook(:initialize_dump_batch, context)
    ensure
      @batch_context = nil
      context[:tracker_id] = tracker.id
      context[:origin_class] = batch_context[:origin_class] || self.class.to_s
      context[:destinations] = @raw_destinations
    end

    context.delete(:force_full_tables) if context[:force_full_tables].empty?

    Sidekiq::Batch.new.tap do |batch|
      context[:root_bid] = batch.bid
      tracker.update(batch_id: batch.bid)

      batch.description = "HD #{export_genre} Export #{tracker.id} Root"
      batch.context = context
      batch.on(:success, "#{self.class}#finalize_dump")
      batch.on(:death, "#{self.class}#cleanup_fatal_error!")
      batch.jobs do
        enqueue_tasks
      rescue => ex
        delayed :cleanup_fatal_error!
        InstDataShipper.handle_suppressed_error(ex)
        tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n"))
      end
    end
  rescue => ex
    if context
      batch ||= Sidekiq::Batch.new.tap do |batch|
        batch.description = "HD #{export_genre} Export #{tracker.id} Early Failure Cleanup"
        batch.context = context
        batch.jobs do
          delayed :cleanup_fatal_error!
        end
      end
    end
    tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n"))
    raise ex
  end
end

#export_genreObject



124
125
126
# File 'lib/inst_data_shipper/dumper.rb', line 124

def export_genre
  self.class.to_s
end

#for_specs!Object



43
44
45
46
47
48
49
# File 'lib/inst_data_shipper/dumper.rb', line 43

def for_specs!
  @raw_destinations = ["speccable://nil"]
  @executor = InstDataShipper::Jobs::AsyncCaller.new
  @tracker = DumpBatch.new(job_class: self.class.to_s, genre: export_genre, status: 'in_progress')
  define_singleton_method(:spec_destination) { destinations.first }
  self
end

#incremental_sinceObject



190
191
192
# File 'lib/inst_data_shipper/dumper.rb', line 190

def incremental_since
  batch_context[:incremental_since]
end

#last_successful_trackerObject



120
121
122
# File 'lib/inst_data_shipper/dumper.rb', line 120

def last_successful_tracker
  @last_successful_tracker ||= DumpBatch.where(job_class: self.class.to_s, genre: export_genre, status: 'completed').order(created_at: :desc).first
end

#lookup_table_schema(*identifiers) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/inst_data_shipper/dumper.rb', line 194

def lookup_table_schema(*identifiers)
  identifiers.compact.each do |ident|
    if ident.is_a?(Hash)
      key = ident.keys.first
      value = ident.values.first
    else
      key = :warehouse_name
      value = ident
    end

    value = Array(value).compact

    schema[:tables].each do |ts|
      return ts if value.include?(ts[key])
    end
  end

  nil
end

#lookup_table_schema!(*identifiers) ⇒ Object



214
215
216
# File 'lib/inst_data_shipper/dumper.rb', line 214

def lookup_table_schema!(*identifiers)
  lookup_table_schema(*identifiers) || raise("No table schema found for #{identifiers.inspect}")
end

#origin_classObject



128
129
130
# File 'lib/inst_data_shipper/dumper.rb', line 128

def origin_class
  batch_context[:origin_class]&.constantize || self.class
end

#schemaObject

Raises:

  • (NotImplementedError)


132
133
134
135
# File 'lib/inst_data_shipper/dumper.rb', line 132

def schema
  return origin_class::SCHEMA if defined?(origin_class::SCHEMA)
  raise NotImplementedError
end

#schema_digestObject



137
138
139
# File 'lib/inst_data_shipper/dumper.rb', line 137

def schema_digest
  Digest::MD5.hexdigest(schema.to_json)[0...8]
end

#table_is_incremental?(table_def) ⇒ Boolean

Returns:

  • (Boolean)


170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/inst_data_shipper/dumper.rb', line 170

def table_is_incremental?(table_def)
  return false unless incremental_since.present?

  table_def = lookup_table_schema!(table_def) if table_def.is_a?(String)

  return false if batch_context[:force_full_tables]&.include?(table_def[:warehouse_name])

  # TODO Return false if table's schema changes
  if (inc = table_def[:incremental]).present?
    differ = inc[:if]
    return !!incremental_since if differ.nil?

    differ = :"#{differ}".to_proc if differ.is_a?(Symbol)
    differ = instance_exec(&differ) if differ.is_a?(Proc)
    return !!differ
  end

  false
end

#table_schema_compatible?(table_def, meta_hash) ⇒ Boolean

Returns:

  • (Boolean)


152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/inst_data_shipper/dumper.rb', line 152

def table_schema_compatible?(table_def, meta_hash)
  # Force full-table-upload if:
  # - The table is not present in the last dump
  return false unless meta_hash

  # - The table's explicitly-set versions do not match
  return false if meta_hash[:table_schema_version] != table_def[:version]

  # - The table does not have an explicitly-set version and the schema hash does not match
  return false if !table_def[:version].present? && meta_hash[:table_schema_hash] != table_schema_hash(table_def)

  true
end

#table_schema_hash(table_def) ⇒ Object



166
167
168
# File 'lib/inst_data_shipper/dumper.rb', line 166

def table_schema_hash(table_def)
  Digest::MD5.hexdigest(table_def.to_json)[0...8]
end

#table_schema_metadata(table_def) ⇒ Object



141
142
143
144
145
146
147
148
149
150
# File 'lib/inst_data_shipper/dumper.rb', line 141

def (table_def)
  meta = {
    table_warehouse_name: table_def[:warehouse_name],
    table_schema_hash: table_schema_hash(table_def),
  }

  meta[:table_schema_version] = table_def[:version] if table_def[:version].present?

  meta
end

#trackerObject



116
117
118
# File 'lib/inst_data_shipper/dumper.rb', line 116

def tracker
  @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil
end