Class: InstDataShipper::Destinations::HostedData

Inherits:
Base
  • Object
show all
Includes:
Concerns::Chunking
Defined in:
lib/inst_data_shipper/destinations/hosted_data.rb

Constant Summary

Constants included from Concerns::Chunking

Concerns::Chunking::DEFAULT_CHUNK_SIZE

Instance Attribute Summary

Attributes inherited from Base

#dumper

Instance Method Summary collapse

Methods included from Concerns::Chunking

#group_key

Methods inherited from Base

#config, #group_key, #initialize, #user_config

Constructor Details

This class inherits a constructor from InstDataShipper::Destinations::Base

Instance Method Details

#chunk_data(generator, table:, extra: nil) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 65

def chunk_data(generator, table:, extra: nil)
  warehouse_name = table[:warehouse_name]

  super(generator) do |batch, idx|
    bits = [warehouse_name, extra, idx].compact
    temp_file = "#{working_dir}/#{bits.join('.')}.tsv.gz"

    Zlib::GzipWriter.open(temp_file) do |gz|
      batch.each do |row|
        row = row.join("\t") if row.is_a?(Array)
        gz.puts(row)
      end
    end

    yield temp_file

    File.delete(temp_file)
  end
end

#cleanup_fatal_errorObject



96
97
98
99
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 96

def cleanup_fatal_error
  hosted_data_client.delete("api/v1/custom_dumps/#{hd_dump_id}/", reason: 'Failure during extraction or transformation') if hd_dump_id.present?
  redis.del(rk(:state))
end

#finalize_dumpObject



91
92
93
94
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 91

def finalize_dump
  hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", start_import: true) if hd_dump_id.present?
  redis.del(rk(:state))
end

#initialize_dump(context) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 46

def initialize_dump(context)
  tags = [
    "ids-schema=#{dumper.schema_digest}",
    "ids-genre=#{dumper.export_genre}",
  ]
  tags << "ids-app=#{Rails.application.class.name.gsub(/::Application$/, '')}" if defined?(Rails) && Rails.application
  tags << "ids-schema-version=#{schema[:version]}" if schema[:version].present?

  dump = hosted_data_client.post(
    'api/v1/custom_dumps/',
    reference_id: tracker.id,
    schema: convert_schema,
    tags: tags,
  ).body.with_indifferent_access

  redis.hset(rk(:state), :dump_id, dump[:id])
  redis.expire(rk(:state), 30.days.to_i)
end

#preinitialize_dump(context) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 8

def preinitialize_dump(context)
  if context[:incremental_since].present?
    begin
      last_dump = hosted_data_client.get("api/v1/custom_dumps/last", {
        status: 'imported',
        tags: [
          # We could also include app in the filter, but each app should already have a distinct key in HD
          "ids-genre=#{dumper.export_genre}",
        ],
        include: ["schema"],
      }).body.with_indifferent_access

      if last_dump[:created_at] < context[:incremental_since]
        InstDataShipper.logger.info("Last successful HostedData dump is older than incremental_since - bumping back incremental_since")
        context[:incremental_since] = last_dump[:created_at]
      end

      if (hd_tables = last_dump[:schema]).present?
        metadatas = hd_tables.values.map { |t| t[:ids_meta] }.compact.map { |t| [t[:table_warehouse_name], t] }.to_h
        schema[:tables].each do |ts|
          hd_meta = metadatas[ts[:warehouse_name]]

          if !dumper.table_schema_compatible?(ts, hd_meta)
            InstDataShipper.logger.info("Last successful HostedData dump of #{ts[:warehouse_name]} has a different schema - forcing full table")
            context[:force_full_tables] << ts[:warehouse_name]
          end
        end
      elsif !last_dump[:tags].include?("ids-schema=#{dumper.schema_digest}")
        InstDataShipper.logger.info("Last successful HostedData dump of the same genre has a different schema - not using incremental_since")
        context[:incremental_since] = nil
      end
    rescue Faraday::ResourceNotFound
      InstDataShipper.logger.info("No Last successful HostedData dump of the same genre - not using incremental_since")
      context[:incremental_since] = nil
    end
  end
end

#upload_data_chunk(table_def, chunk) ⇒ Object



85
86
87
88
89
# File 'lib/inst_data_shipper/destinations/hosted_data.rb', line 85

def upload_data_chunk(table_def, chunk)
  hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", artifacts: {
    table_name(table_def) => [Faraday::UploadIO.new(chunk, 'application/gzip')],
  })
end