Class: Parqueteur::Converter

Inherits:
Object
  • Object
show all
Defined in:
lib/parqueteur/converter.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =
100

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(input, **kwargs) ⇒ Converter

Returns a new instance of Converter.

Parameters:

  • An (Enumerable)

    enumerable object

  • [Symbol] (Hash)

    a customizable set of options



38
39
40
41
42
# File 'lib/parqueteur/converter.rb', line 38

def initialize(input, **kwargs)
  @input = Parqueteur::Input.from(input)
  @batch_size = kwargs.fetch(:batch_size, DEFAULT_BATCH_SIZE)
  @compression = kwargs.fetch(:compression, nil)&.to_sym
end

Class Method Details

.column(name, type, options = {}, &block) ⇒ Object



15
16
17
# File 'lib/parqueteur/converter.rb', line 15

def self.column(name, type, options = {}, &block)
  columns.add(Parqueteur::Column.new(name, type, options, &block))
end

.columnsObject



11
12
13
# File 'lib/parqueteur/converter.rb', line 11

def self.columns
  @columns ||= Parqueteur::ColumnCollection.new
end

.convert(input, **kwargs) ⇒ Object



27
28
29
# File 'lib/parqueteur/converter.rb', line 27

def self.convert(input, **kwargs)
  new(input, **kwargs).to_io
end

.convert_to(input, output_path, **kwargs) ⇒ Object



31
32
33
34
# File 'lib/parqueteur/converter.rb', line 31

def self.convert_to(input, output_path, **kwargs)
  converter = new(input, **kwargs)
  converter.write(output_path)
end

.inline(&block) ⇒ Object



7
8
9
# File 'lib/parqueteur/converter.rb', line 7

def self.inline(&block)
  Class.new(self, &block)
end

.transform(method_name = nil, &block) ⇒ Object



23
24
25
# File 'lib/parqueteur/converter.rb', line 23

def self.transform(method_name = nil, &block)
  transforms << (method_name || block)
end

.transformsObject



19
20
21
# File 'lib/parqueteur/converter.rb', line 19

def self.transforms
  @transforms ||= []
end

Instance Method Details

#split(size, batch_size: nil, compression: nil) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/parqueteur/converter.rb', line 44

def split(size, batch_size: nil, compression: nil)
  Enumerator.new do |arr|
    options = {
      batch_size: batch_size || @batch_size,
      compression: compression || @compression
    }
    @input.each_slice(size) do |records|
      local_converter = self.class.new(records, **options)
      file = local_converter.to_tmpfile
      arr << file
      file.close
      file.unlink
    end
  end
end

#split_by_io(size, batch_size: nil, compression: nil) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/parqueteur/converter.rb', line 60

def split_by_io(size, batch_size: nil, compression: nil)
  Enumerator.new do |arr|
    options = {
      batch_size: batch_size || @batch_size,
      compression: compression || @compression
    }
    @input.each_slice(size) do |records|
      local_converter = self.class.new(records, **options)
      arr << local_converter.to_io
    end
  end
end

#to_arrow_table(options = {}) ⇒ Object



110
111
112
113
114
115
116
# File 'lib/parqueteur/converter.rb', line 110

def to_arrow_table(options = {})
  file = to_tmpfile(options)
  table = Arrow::Table.load(file.path, format: :parquet)
  file.close
  file.unlink
  table
end

#to_blob(options = {}) ⇒ Object



118
119
120
# File 'lib/parqueteur/converter.rb', line 118

def to_blob(options = {})
  to_tmpfile(options).read
end

#to_io(options = {}) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/parqueteur/converter.rb', line 102

def to_io(options = {})
  tmpfile = to_tmpfile(options)
  strio = StringIO.new(tmpfile.read)
  tmpfile.close
  tmpfile.unlink
  strio
end

#to_tmpfile(options = {}) ⇒ Object



94
95
96
97
98
99
100
# File 'lib/parqueteur/converter.rb', line 94

def to_tmpfile(options = {})
  tempfile = Tempfile.new
  tempfile.binmode
  write(tempfile.path, **options)
  tempfile.rewind
  tempfile
end

#write(path, batch_size: nil, compression: nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/parqueteur/converter.rb', line 73

def write(path, batch_size: nil, compression: nil)
  compression = @compression if compression.nil?
  batch_size = @batch_size if batch_size.nil?
  arrow_schema = self.class.columns.arrow_schema
  writer_properties = Parquet::WriterProperties.new
  if !compression.nil? && compression != false
    writer_properties.set_compression(compression)
  end

  Arrow::FileOutputStream.open(path, false) do |output|
    Parquet::ArrowFileWriter.open(arrow_schema, output, writer_properties) do |writer|
      @input.each_slice(batch_size) do |records|
        arrow_table = build_arrow_table(records)
        writer.write_table(arrow_table, 1024)
      end
    end
  end

  true
end