Class: Fluent::BigQuery::RecordSchema

Inherits:
FieldSchema show all
Defined in:
lib/fluent/plugin/bigquery/schema.rb

Constant Summary collapse

FIELD_TYPES =
{
  string: StringFieldSchema,
  integer: IntegerFieldSchema,
  float: FloatFieldSchema,
  numeric: NumericFieldSchema,
  bignumeric: BigNumericFieldSchema,
  boolean: BooleanFieldSchema,
  timestamp: TimestampFieldSchema,
  date: DateFieldSchema,
  datetime: DateTimeFieldSchema,
  time: TimeFieldSchema,
  json: JsonFieldSchema,
  geography: GeographyFieldSchema,
  record: RecordSchema
}.freeze

Instance Attribute Summary

Attributes inherited from FieldSchema

#mode, #name

Instance Method Summary collapse

Methods inherited from FieldSchema

#format

Constructor Details

#initialize(name, mode = :nullable) ⇒ RecordSchema

Returns a new instance of RecordSchema.



224
225
226
227
# File 'lib/fluent/plugin/bigquery/schema.rb', line 224

def initialize(name, mode = :nullable)
  super(name, mode)
  @fields = {}
end

Instance Method Details

#[](name) ⇒ Object



233
234
235
# File 'lib/fluent/plugin/bigquery/schema.rb', line 233

def [](name)
  @fields[name]
end

#empty?Boolean

Returns:

  • (Boolean)


237
238
239
# File 'lib/fluent/plugin/bigquery/schema.rb', line 237

def empty?
  @fields.empty?
end

#format_one(record, is_load: false) ⇒ Object



292
293
294
295
296
297
298
299
300
# File 'lib/fluent/plugin/bigquery/schema.rb', line 292

def format_one(record, is_load: false)
  out = {}
  record.each do |key, value|
    next if value.nil?
    schema = @fields[key]
    out[key] = schema ? schema.format(value, is_load: is_load) : value
  end
  out
end

#load_schema(schema) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/fluent/plugin/bigquery/schema.rb', line 256

def load_schema(schema)
  schema.each do |field|
    raise ConfigError, 'field must have type' unless field.key?('type')

    name = field['name']
    mode = (field['mode'] || 'nullable').downcase.to_sym

    type = field['type'].downcase.to_sym
    field_schema_class = FIELD_TYPES[type]
    raise ConfigError, "Invalid field type: #{field['type']}" unless field_schema_class

    field_schema = field_schema_class.new(name, mode)
    @fields[name] = field_schema
    if type == :record
      raise ConfigError, "record field must have fields" unless field.key?('fields')
      field_schema.load_schema(field['fields'])
    end
  end
end

#register_field(name, type) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/fluent/plugin/bigquery/schema.rb', line 276

def register_field(name, type)
  if @fields.key?(name) and @fields[name].type != :timestamp
    raise ConfigError, "field #{name} is registered twice"
  end
  if name[/\./]
    recordname = $`
    fieldname = $'
    register_record_field(recordname)
    @fields[recordname].register_field(fieldname, type)
  else
    schema = FIELD_TYPES[type]
    raise ConfigError, "[Bug] Invalid field type #{type}" unless schema
    @fields[name] = schema.new(name)
  end
end

#to_aObject



241
242
243
244
245
# File 'lib/fluent/plugin/bigquery/schema.rb', line 241

def to_a
  @fields.map do |_, field_schema|
    field_schema.to_h
  end
end

#to_hObject



247
248
249
250
251
252
253
254
# File 'lib/fluent/plugin/bigquery/schema.rb', line 247

def to_h
  {
    :name => name,
    :type => type.to_s.upcase,
    :mode => mode.to_s.upcase,
    :fields => self.to_a,
  }
end

#typeObject



229
230
231
# File 'lib/fluent/plugin/bigquery/schema.rb', line 229

def type
  :record
end