Class: Schema::Inference::SchemaInferrer

Inherits:
Object
  • Object
show all
Defined in:
lib/schema/inference/schema_inferrer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(separator: '.', convert_types_to_string: false) ⇒ SchemaInferrer

Returns a new instance of SchemaInferrer.



6
7
8
9
# File 'lib/schema/inference/schema_inferrer.rb', line 6

def initialize(separator: '.', convert_types_to_string: false)
  @separator = separator
  @convert_types_to_string = convert_types_to_string
end

Instance Attribute Details

#separatorObject

Returns the value of attribute separator.



4
5
6
# File 'lib/schema/inference/schema_inferrer.rb', line 4

def separator
  @separator
end

Instance Method Details

#infer_schema(dataset: [], batch_count: 0, extended: false) ⇒ Hash

Generate a schema based on this collection’s records. We evaluate the schema of each record and then merge all the information together.

Parameters:

  • dataset (Array) (defaults to: [])

    of samples on which we will perform the schema analysis.

  • extended (Boolean) (defaults to: false)

    Set to true to keep each field as a basic type. Set to false to reduce the terminal arrays to a single key (under the type array).

Returns:

  • (Hash)

    with one entry per ‘column’/‘field’. The values contains information about the type and usage.

Raises:

  • (ArgumentError)


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/schema/inference/schema_inferrer.rb', line 20

def infer_schema(dataset: [], batch_count: 0, extended: false)
  # support detecting schemas of single objects
  dataset = [dataset] if dataset.is_a?(Hash)
  validate_dataset(dataset)

  has_dataset = dataset.count > 0 || (block_given? && batch_count > 0)
  raise ArgumentError, 'a dataset or a block with a batch count must be passed' unless has_dataset

  if dataset.is_a?(Array) && dataset.count > 0
    # divide in batches to process in parallel
    per_process = (dataset.count / Parallel.processor_count.to_f).ceil
    batch_count = (dataset.count / per_process.to_f).ceil
  end

  results = parallel_map(batch_count.times) { |i|
    batch = block_given? ? yield(i) : dataset[i*per_process...(i+1)*per_process]
    { partial_schema: data_schema(batch), count: batch.count }
  }

  partial_schemas = results.map { |r| r[:partial_schema] }
  total_count = results.map { |r| r[:count] }.reduce(:+)

  table_schema = process_schema_results(partial_schemas, total_count, extended)
  table_schema.sort_by { |k, v| -v[:usage] }.to_h
end