Class: Parqueteur::Converter
- Inherits:
-
Object
- Object
- Parqueteur::Converter
- Defined in:
- lib/parqueteur/converter.rb
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
100
Class Method Summary collapse
- .column(name, type, options = {}, &block) ⇒ Object
- .columns ⇒ Object
- .convert(input, **kwargs) ⇒ Object
- .convert_to(input, output_path, **kwargs) ⇒ Object
- .inline(&block) ⇒ Object
- .transform(method_name = nil, &block) ⇒ Object
- .transforms ⇒ Object
Instance Method Summary collapse
-
#initialize(input, **kwargs) ⇒ Converter
constructor
A new instance of Converter.
- #split(size, batch_size: nil, compression: nil) ⇒ Object
- #split_by_io(size, batch_size: nil, compression: nil) ⇒ Object
- #to_arrow_table(options = {}) ⇒ Object
- #to_blob(options = {}) ⇒ Object
- #to_io(options = {}) ⇒ Object
- #to_tmpfile(options = {}) ⇒ Object
- #write(path, batch_size: nil, compression: nil) ⇒ Object
Constructor Details
#initialize(input, **kwargs) ⇒ Converter
Returns a new instance of Converter.
38 39 40 41 42 |
# File 'lib/parqueteur/converter.rb', line 38 def initialize(input, **kwargs) @input = Parqueteur::Input.from(input) @batch_size = kwargs.fetch(:batch_size, DEFAULT_BATCH_SIZE) @compression = kwargs.fetch(:compression, nil)&.to_sym end |
Class Method Details
.column(name, type, options = {}, &block) ⇒ Object
15 16 17 |
# File 'lib/parqueteur/converter.rb', line 15 def self.column(name, type, = {}, &block) columns.add(Parqueteur::Column.new(name, type, , &block)) end |
.columns ⇒ Object
11 12 13 |
# File 'lib/parqueteur/converter.rb', line 11 def self.columns @columns ||= Parqueteur::ColumnCollection.new end |
.convert(input, **kwargs) ⇒ Object
27 28 29 |
# File 'lib/parqueteur/converter.rb', line 27 def self.convert(input, **kwargs) new(input, **kwargs).to_io end |
.convert_to(input, output_path, **kwargs) ⇒ Object
31 32 33 34 |
# File 'lib/parqueteur/converter.rb', line 31 def self.convert_to(input, output_path, **kwargs) converter = new(input, **kwargs) converter.write(output_path) end |
.inline(&block) ⇒ Object
7 8 9 |
# File 'lib/parqueteur/converter.rb', line 7 def self.inline(&block) Class.new(self, &block) end |
.transform(method_name = nil, &block) ⇒ Object
23 24 25 |
# File 'lib/parqueteur/converter.rb', line 23 def self.transform(method_name = nil, &block) transforms << (method_name || block) end |
.transforms ⇒ Object
19 20 21 |
# File 'lib/parqueteur/converter.rb', line 19 def self.transforms @transforms ||= [] end |
Instance Method Details
#split(size, batch_size: nil, compression: nil) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/parqueteur/converter.rb', line 44 def split(size, batch_size: nil, compression: nil) Enumerator.new do |arr| = { batch_size: batch_size || @batch_size, compression: compression || @compression } @input.each_slice(size) do |records| local_converter = self.class.new(records, **) file = local_converter.to_tmpfile arr << file file.close file.unlink end end end |
#split_by_io(size, batch_size: nil, compression: nil) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/parqueteur/converter.rb', line 60 def split_by_io(size, batch_size: nil, compression: nil) Enumerator.new do |arr| = { batch_size: batch_size || @batch_size, compression: compression || @compression } @input.each_slice(size) do |records| local_converter = self.class.new(records, **) arr << local_converter.to_io end end end |
#to_arrow_table(options = {}) ⇒ Object
110 111 112 113 114 115 116 |
# File 'lib/parqueteur/converter.rb', line 110 def to_arrow_table( = {}) file = to_tmpfile() table = Arrow::Table.load(file.path, format: :parquet) file.close file.unlink table end |
#to_blob(options = {}) ⇒ Object
118 119 120 |
# File 'lib/parqueteur/converter.rb', line 118 def to_blob( = {}) to_tmpfile().read end |
#to_io(options = {}) ⇒ Object
102 103 104 105 106 107 108 |
# File 'lib/parqueteur/converter.rb', line 102 def to_io( = {}) tmpfile = to_tmpfile() strio = StringIO.new(tmpfile.read) tmpfile.close tmpfile.unlink strio end |
#to_tmpfile(options = {}) ⇒ Object
94 95 96 97 98 99 100 |
# File 'lib/parqueteur/converter.rb', line 94 def to_tmpfile( = {}) tempfile = Tempfile.new tempfile.binmode write(tempfile.path, **) tempfile.rewind tempfile end |
#write(path, batch_size: nil, compression: nil) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/parqueteur/converter.rb', line 73 def write(path, batch_size: nil, compression: nil) compression = @compression if compression.nil? batch_size = @batch_size if batch_size.nil? arrow_schema = self.class.columns.arrow_schema writer_properties = Parquet::WriterProperties.new if !compression.nil? && compression != false writer_properties.set_compression(compression) end Arrow::FileOutputStream.open(path, false) do |output| Parquet::ArrowFileWriter.open(arrow_schema, output, writer_properties) do |writer| @input.each_slice(batch_size) do |records| arrow_table = build_arrow_table(records) writer.write_table(arrow_table, 1024) end end end true end |