Class: RedshiftConnector::S3DataFileBundle
Constant Summary
AbstractDataFileBundle::REPORT_SIZE
Instance Attribute Summary collapse
#batch_size, #logger
Class Method Summary
collapse
-
.for_params(params) ⇒ Object
-
.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
-
.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
Instance Method Summary
collapse
#all_data_objects, #each_batch, #each_object, #each_row
Constructor Details
#initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ S3DataFileBundle
Returns a new instance of S3DataFileBundle.
34
35
36
37
38
39
40
41
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 34
def initialize(bucket, prefix, format: :csv, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger)
super filter: filter, batch_size: batch_size, logger: logger
@bucket = bucket
@prefix = prefix
@format = format
@reader_class = Reader.get(format)
logger.info "reader: #{@reader_class}"
end
|
Instance Attribute Details
#bucket ⇒ Object
Returns the value of attribute bucket.
43
44
45
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 43
def bucket
@bucket
end
|
#prefix ⇒ Object
Returns the value of attribute prefix.
44
45
46
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 44
def prefix
@prefix
end
|
Class Method Details
.for_params(params) ⇒ Object
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 9
def self.for_params(params)
unless params.txn_id
raise ArgumentError, "cannot create bundle: missing txn_id"
end
s3bucket = params.bucket ? S3Bucket.get(params.bucket) : S3Bucket.default
for_table(
bucket: s3bucket,
schema: params.schema,
table: params.table,
txn_id: params.txn_id,
filter: params.filter,
logger: params.logger
)
end
|
.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
24
25
26
27
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 24
def self.for_prefix(bucket: S3Bucket.default, prefix:, format:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger)
real_prefix = "#{bucket.prefix}/#{prefix}"
new(bucket, real_prefix, format: format, filter: filter, batch_size: batch_size, logger: logger)
end
|
.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger) ⇒ Object
29
30
31
32
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 29
def self.for_table(bucket: S3Bucket.default, schema:, table:, txn_id:, filter: nil, batch_size: 1000, logger: RedshiftConnector.logger)
prefix = "#{bucket.prefix}/#{schema}_export/#{table}/#{txn_id}/#{table}.csv."
new(bucket, prefix, format: :redshift_csv, filter: filter, batch_size: batch_size, logger: logger)
end
|
Instance Method Details
#clear ⇒ Object
59
60
61
62
63
64
65
66
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 59
def clear
pref = File.dirname(@prefix) + '/'
keys = @bucket.objects(prefix: pref).map(&:key)
unless keys.empty?
logger.info "DELETE #{pref}*"
@bucket.delete_objects(keys)
end
end
|
#credential_string ⇒ Object
50
51
52
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 50
def credential_string
@bucket.credential_string
end
|
#data_files ⇒ Object
54
55
56
57
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 54
def data_files
@bucket.objects(prefix: @prefix)
.map {|obj| S3DataFile.new(obj, reader_class: @reader_class) }
end
|
#url ⇒ Object
46
47
48
|
# File 'lib/redshift_connector/s3_data_file_bundle.rb', line 46
def url
"s3://#{@bucket.name}/#{@prefix}"
end
|