Class: Avro::IO::DatumReader

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/io.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader

Returns a new instance of DatumReader.



246
247
248
249
# File 'lib/avro/io.rb', line 246

def initialize(writers_schema=nil, readers_schema=nil)
  @writers_schema = writers_schema
  @readers_schema = readers_schema
end

Instance Attribute Details

#readers_schemaObject

Returns the value of attribute readers_schema.



244
245
246
# File 'lib/avro/io.rb', line 244

def readers_schema
  @readers_schema
end

#writers_schemaObject

Returns the value of attribute writers_schema.



244
245
246
# File 'lib/avro/io.rb', line 244

def writers_schema
  @writers_schema
end

Class Method Details

.match_schemas(writers_schema, readers_schema) ⇒ Object



240
241
242
# File 'lib/avro/io.rb', line 240

def self.match_schemas(writers_schema, readers_schema)
  Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
end

Instance Method Details

#read(decoder) ⇒ Object



251
252
253
254
# File 'lib/avro/io.rb', line 251

def read(decoder)
  self.readers_schema = writers_schema unless readers_schema
  read_data(writers_schema, readers_schema, decoder)
end

#read_array(writers_schema, readers_schema, decoder) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/avro/io.rb', line 313

def read_array(writers_schema, readers_schema, decoder)
  read_items = []
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      _block_size = decoder.read_long
    end
    block_count.times do
      read_items << read_data(writers_schema.items,
                              readers_schema.items,
                              decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_data(writers_schema, readers_schema, decoder) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/avro/io.rb', line 256

def read_data(writers_schema, readers_schema, decoder)
  # schema matching
  unless self.class.match_schemas(writers_schema, readers_schema)
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # schema resolution: reader's schema is a union, writer's
  # schema is not
  if writers_schema.type_sym != :union && readers_schema.type_sym == :union
    rs = readers_schema.schemas.find{|s|
      self.class.match_schemas(writers_schema, s)
    }
    return read_data(writers_schema, rs, decoder) if rs
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # function dispatch for reading data based on type of writer's
  # schema
  datum = case writers_schema.type_sym
  when :null;    decoder.read_null
  when :boolean; decoder.read_boolean
  when :string;  decoder.read_string
  when :int;     decoder.read_int
  when :long;    decoder.read_long
  when :float;   decoder.read_float
  when :double;  decoder.read_double
  when :bytes;   decoder.read_bytes
  when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
  when :enum;    read_enum(writers_schema, readers_schema, decoder)
  when :array;   read_array(writers_schema, readers_schema, decoder)
  when :map;     read_map(writers_schema, readers_schema, decoder)
  when :union;   read_union(writers_schema, readers_schema, decoder)
  when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
  else
    raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
  end

  readers_schema.type_adapter.decode(datum)
end

#read_default_value(field_schema, default_value) ⇒ Object



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/avro/io.rb', line 391

def read_default_value(field_schema, default_value)
  # Basically a JSON Decoder?
  datum = case field_schema.type_sym
  when :null
    nil
  when :int, :long
    Integer(default_value)
  when :float, :double
    Float(default_value)
  when :boolean, :enum, :fixed, :string, :bytes
    default_value
  when :array
    read_array = []
    default_value.each do |json_val|
      item_val = read_default_value(field_schema.items, json_val)
      read_array << item_val
    end
    read_array
  when :map
    read_map = {}
    default_value.each do |key, json_val|
      map_val = read_default_value(field_schema.values, json_val)
      read_map[key] = map_val
    end
    read_map
  when :union
    read_default_value(field_schema.schemas[0], default_value)
  when :record, :error
    read_record = {}
    field_schema.fields.each do |field|
      json_val = default_value[field.name]
      json_val = field.default unless json_val
      field_val = read_default_value(field.type, json_val)
      read_record[field.name] = field_val
    end
    read_record
  else
    fail_msg = "Unknown type: #{field_schema.type}"
    raise AvroError, fail_msg
  end

  field_schema.type_adapter.decode(datum)
end

#read_enum(writers_schema, readers_schema, decoder) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/avro/io.rb', line 300

def read_enum(writers_schema, readers_schema, decoder)
  index_of_symbol = decoder.read_int
  read_symbol = writers_schema.symbols[index_of_symbol]

  if !readers_schema.symbols.include?(read_symbol) && readers_schema.default
    read_symbol = readers_schema.default
  end

  # This implementation deviates from the spec by always returning
  # a symbol.
  read_symbol
end

#read_fixed(writers_schema, _readers_schema, decoder) ⇒ Object



296
297
298
# File 'lib/avro/io.rb', line 296

def read_fixed(writers_schema, _readers_schema, decoder)
  decoder.read(writers_schema.size)
end

#read_map(writers_schema, readers_schema, decoder) ⇒ Object



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# File 'lib/avro/io.rb', line 332

def read_map(writers_schema, readers_schema, decoder)
  read_items = {}
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      _block_size = decoder.read_long
    end
    block_count.times do
      key = decoder.read_string
      read_items[key] = read_data(writers_schema.values,
                                  readers_schema.values,
                                  decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_record(writers_schema, readers_schema, decoder) ⇒ Object



359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/avro/io.rb', line 359

def read_record(writers_schema, readers_schema, decoder)
  readers_fields_hash = readers_schema.fields_hash
  read_record = {}
  writers_schema.fields.each do |field|
    readers_field = readers_fields_hash[field.name]
    if readers_field
      field_val = read_data(field.type, readers_field.type, decoder)
      read_record[field.name] = field_val
    elsif readers_schema.fields_by_alias.key?(field.name)
      readers_field = readers_schema.fields_by_alias[field.name]
      field_val = read_data(field.type, readers_field.type, decoder)
      read_record[readers_field.name] = field_val
    else
      skip_data(field.type, decoder)
    end
  end

  # fill in the default values
  readers_fields_hash.each do |field_name, field|
    next if read_record.key?(field_name)

    if field.default?
      field_val = read_default_value(field.type, field.default)
      read_record[field.name] = field_val
    else
      raise AvroError, "Missing data for #{field.type} with no default"
    end
  end

  read_record
end

#read_union(writers_schema, readers_schema, decoder) ⇒ Object



352
353
354
355
356
357
# File 'lib/avro/io.rb', line 352

def read_union(writers_schema, readers_schema, decoder)
  index_of_schema = decoder.read_long
  selected_writers_schema = writers_schema.schemas[index_of_schema]

  read_data(selected_writers_schema, readers_schema, decoder)
end

#skip_array(writers_schema, decoder) ⇒ Object



483
484
485
# File 'lib/avro/io.rb', line 483

def skip_array(writers_schema, decoder)
  skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
end

#skip_data(writers_schema, decoder) ⇒ Object



435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'lib/avro/io.rb', line 435

def skip_data(writers_schema, decoder)
  case writers_schema.type_sym
  when :null
    decoder.skip_null
  when :boolean
    decoder.skip_boolean
  when :string
    decoder.skip_string
  when :int
    decoder.skip_int
  when :long
    decoder.skip_long
  when :float
    decoder.skip_float
  when :double
    decoder.skip_double
  when :bytes
    decoder.skip_bytes
  when :fixed
    skip_fixed(writers_schema, decoder)
  when :enum
    skip_enum(writers_schema, decoder)
  when :array
    skip_array(writers_schema, decoder)
  when :map
    skip_map(writers_schema, decoder)
  when :union
    skip_union(writers_schema, decoder)
  when :record, :error, :request
    skip_record(writers_schema, decoder)
  else
    raise AvroError, "Unknown schema type: #{writers_schema.type}"
  end
end

#skip_enum(_writers_schema, decoder) ⇒ Object



474
475
476
# File 'lib/avro/io.rb', line 474

def skip_enum(_writers_schema, decoder)
  decoder.skip_int
end

#skip_fixed(writers_schema, decoder) ⇒ Object



470
471
472
# File 'lib/avro/io.rb', line 470

def skip_fixed(writers_schema, decoder)
  decoder.skip(writers_schema.size)
end

#skip_map(writers_schema, decoder) ⇒ Object



487
488
489
490
491
492
# File 'lib/avro/io.rb', line 487

def skip_map(writers_schema, decoder)
  skip_blocks(decoder) {
    decoder.skip_string
    skip_data(writers_schema.values, decoder)
  }
end

#skip_record(writers_schema, decoder) ⇒ Object



494
495
496
# File 'lib/avro/io.rb', line 494

def skip_record(writers_schema, decoder)
  writers_schema.fields.each{|f| skip_data(f.type, decoder) }
end

#skip_union(writers_schema, decoder) ⇒ Object



478
479
480
481
# File 'lib/avro/io.rb', line 478

def skip_union(writers_schema, decoder)
  index = decoder.read_long
  skip_data(writers_schema.schemas[index], decoder)
end