Class: Schema::Inference::SchemaInferrer
- Inherits:
-
Object
- Object
- Schema::Inference::SchemaInferrer
- Defined in:
- lib/schema/inference/schema_inferrer.rb
Instance Attribute Summary collapse
-
#separator ⇒ Object
Returns the value of attribute separator.
Instance Method Summary collapse
-
#infer_schema(dataset: [], batch_count: 0, extended: false) ⇒ Hash
Generate a schema based on this collection’s records.
-
#initialize(separator: '.', convert_types_to_string: false) ⇒ SchemaInferrer
constructor
A new instance of SchemaInferrer.
Constructor Details
#initialize(separator: '.', convert_types_to_string: false) ⇒ SchemaInferrer
Returns a new instance of SchemaInferrer.
6 7 8 9 |
# File 'lib/schema/inference/schema_inferrer.rb', line 6 def initialize(separator: '.', convert_types_to_string: false) @separator = separator @convert_types_to_string = convert_types_to_string end |
Instance Attribute Details
#separator ⇒ Object
Returns the value of attribute separator.
4 5 6 |
# File 'lib/schema/inference/schema_inferrer.rb', line 4 def separator @separator end |
Instance Method Details
#infer_schema(dataset: [], batch_count: 0, extended: false) ⇒ Hash
Generate a schema based on this collection’s records. We evaluate the schema of each record and then merge all the information together.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/schema/inference/schema_inferrer.rb', line 20 def infer_schema(dataset: [], batch_count: 0, extended: false) # support detecting schemas of single objects dataset = [dataset] if dataset.is_a?(Hash) validate_dataset(dataset) has_dataset = dataset.count > 0 || (block_given? && batch_count > 0) raise ArgumentError, 'a dataset or a block with a batch count must be passed' unless has_dataset if dataset.is_a?(Array) && dataset.count > 0 # divide in batches to process in parallel per_process = (dataset.count / Parallel.processor_count.to_f).ceil batch_count = (dataset.count / per_process.to_f).ceil end results = parallel_map(batch_count.times) { |i| batch = block_given? ? yield(i) : dataset[i*per_process...(i+1)*per_process] { partial_schema: data_schema(batch), count: batch.count } } partial_schemas = results.map { |r| r[:partial_schema] } total_count = results.map { |r| r[:count] }.reduce(:+) table_schema = process_schema_results(partial_schemas, total_count, extended) table_schema.sort_by { |k, v| -v[:usage] }.to_h end |