Class: Avro::IO::DatumReader

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/io.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader

Returns a new instance of DatumReader.



245
246
247
248
# File 'lib/avro/io.rb', line 245

def initialize(writers_schema=nil, readers_schema=nil)
  @writers_schema = writers_schema
  @readers_schema = readers_schema
end

Instance Attribute Details

#readers_schemaObject

Returns the value of attribute readers_schema.



243
244
245
# File 'lib/avro/io.rb', line 243

def readers_schema
  @readers_schema
end

#writers_schemaObject

Returns the value of attribute writers_schema.



243
244
245
# File 'lib/avro/io.rb', line 243

def writers_schema
  @writers_schema
end

Class Method Details

.match_schemas(writers_schema, readers_schema) ⇒ Object



239
240
241
# File 'lib/avro/io.rb', line 239

def self.match_schemas(writers_schema, readers_schema)
  Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema)
end

Instance Method Details

#read(decoder) ⇒ Object



250
251
252
253
# File 'lib/avro/io.rb', line 250

def read(decoder)
  self.readers_schema = writers_schema unless readers_schema
  read_data(writers_schema, readers_schema, decoder)
end

#read_array(writers_schema, readers_schema, decoder) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/avro/io.rb', line 312

def read_array(writers_schema, readers_schema, decoder)
  read_items = []
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      _block_size = decoder.read_long
    end
    block_count.times do
      read_items << read_data(writers_schema.items,
                              readers_schema.items,
                              decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_data(writers_schema, readers_schema, decoder) ⇒ Object



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/avro/io.rb', line 255

def read_data(writers_schema, readers_schema, decoder)
  # schema matching
  unless self.class.match_schemas(writers_schema, readers_schema)
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # schema resolution: reader's schema is a union, writer's
  # schema is not
  if writers_schema.type_sym != :union && readers_schema.type_sym == :union
    rs = readers_schema.schemas.find{|s|
      self.class.match_schemas(writers_schema, s)
    }
    return read_data(writers_schema, rs, decoder) if rs
    raise SchemaMatchException.new(writers_schema, readers_schema)
  end

  # function dispatch for reading data based on type of writer's
  # schema
  datum = case writers_schema.type_sym
  when :null;    decoder.read_null
  when :boolean; decoder.read_boolean
  when :string;  decoder.read_string
  when :int;     decoder.read_int
  when :long;    decoder.read_long
  when :float;   decoder.read_float
  when :double;  decoder.read_double
  when :bytes;   decoder.read_bytes
  when :fixed;   read_fixed(writers_schema, readers_schema, decoder)
  when :enum;    read_enum(writers_schema, readers_schema, decoder)
  when :array;   read_array(writers_schema, readers_schema, decoder)
  when :map;     read_map(writers_schema, readers_schema, decoder)
  when :union;   read_union(writers_schema, readers_schema, decoder)
  when :record, :error, :request;  read_record(writers_schema, readers_schema, decoder)
  else
    raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}"
  end

  readers_schema.type_adapter.decode(datum)
end

#read_default_value(field_schema, default_value) ⇒ Object



388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/avro/io.rb', line 388

def read_default_value(field_schema, default_value)
  # Basically a JSON Decoder?
  case field_schema.type_sym
  when :null
    return nil
  when :boolean
    return default_value
  when :int, :long
    return Integer(default_value)
  when :float, :double
    return Float(default_value)
  when :enum, :fixed, :string, :bytes
    return default_value
  when :array
    read_array = []
    default_value.each do |json_val|
      item_val = read_default_value(field_schema.items, json_val)
      read_array << item_val
    end
    return read_array
  when :map
    read_map = {}
    default_value.each do |key, json_val|
      map_val = read_default_value(field_schema.values, json_val)
      read_map[key] = map_val
    end
    return read_map
  when :union
    return read_default_value(field_schema.schemas[0], default_value)
  when :record, :error
    read_record = {}
    field_schema.fields.each do |field|
      json_val = default_value[field.name]
      json_val = field.default unless json_val
      field_val = read_default_value(field.type, json_val)
      read_record[field.name] = field_val
    end
    return read_record
  else
    fail_msg = "Unknown type: #{field_schema.type}"
    raise AvroError, fail_msg
  end
end

#read_enum(writers_schema, readers_schema, decoder) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/avro/io.rb', line 299

def read_enum(writers_schema, readers_schema, decoder)
  index_of_symbol = decoder.read_int
  read_symbol = writers_schema.symbols[index_of_symbol]

  # TODO(jmhodges): figure out what unset means for resolution
  # schema resolution
  unless readers_schema.symbols.include?(read_symbol)
    # 'unset' here
  end

  read_symbol
end

#read_fixed(writers_schema, readers_schema, decoder) ⇒ Object



295
296
297
# File 'lib/avro/io.rb', line 295

def read_fixed(writers_schema, readers_schema, decoder)
  decoder.read(writers_schema.size)
end

#read_map(writers_schema, readers_schema, decoder) ⇒ Object



331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/avro/io.rb', line 331

def read_map(writers_schema, readers_schema, decoder)
  read_items = {}
  block_count = decoder.read_long
  while block_count != 0
    if block_count < 0
      block_count = -block_count
      _block_size = decoder.read_long
    end
    block_count.times do
      key = decoder.read_string
      read_items[key] = read_data(writers_schema.values,
                                  readers_schema.values,
                                  decoder)
    end
    block_count = decoder.read_long
  end

  read_items
end

#read_record(writers_schema, readers_schema, decoder) ⇒ Object



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
# File 'lib/avro/io.rb', line 358

def read_record(writers_schema, readers_schema, decoder)
  readers_fields_hash = readers_schema.fields_hash
  read_record = {}
  writers_schema.fields.each do |field|
    if readers_field = readers_fields_hash[field.name]
      field_val = read_data(field.type, readers_field.type, decoder)
      read_record[field.name] = field_val
    else
      skip_data(field.type, decoder)
    end
  end

  # fill in the default values
  if readers_fields_hash.size > read_record.size
    writers_fields_hash = writers_schema.fields_hash
    readers_fields_hash.each do |field_name, field|
      unless writers_fields_hash.has_key? field_name
        if field.default?
          field_val = read_default_value(field.type, field.default)
          read_record[field.name] = field_val
        else
          raise AvroError, "Missing data for #{field.type} with no default"
        end
      end
    end
  end

  read_record
end

#read_union(writers_schema, readers_schema, decoder) ⇒ Object



351
352
353
354
355
356
# File 'lib/avro/io.rb', line 351

def read_union(writers_schema, readers_schema, decoder)
  index_of_schema = decoder.read_long
  selected_writers_schema = writers_schema.schemas[index_of_schema]

  read_data(selected_writers_schema, readers_schema, decoder)
end

#skip_array(writers_schema, decoder) ⇒ Object



480
481
482
# File 'lib/avro/io.rb', line 480

def skip_array(writers_schema, decoder)
  skip_blocks(decoder) { skip_data(writers_schema.items, decoder) }
end

#skip_data(writers_schema, decoder) ⇒ Object



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/avro/io.rb', line 432

def skip_data(writers_schema, decoder)
  case writers_schema.type_sym
  when :null
    decoder.skip_null
  when :boolean
    decoder.skip_boolean
  when :string
    decoder.skip_string
  when :int
    decoder.skip_int
  when :long
    decoder.skip_long
  when :float
    decoder.skip_float
  when :double
    decoder.skip_double
  when :bytes
    decoder.skip_bytes
  when :fixed
    skip_fixed(writers_schema, decoder)
  when :enum
    skip_enum(writers_schema, decoder)
  when :array
    skip_array(writers_schema, decoder)
  when :map
    skip_map(writers_schema, decoder)
  when :union
    skip_union(writers_schema, decoder)
  when :record, :error, :request
    skip_record(writers_schema, decoder)
  else
    raise AvroError, "Unknown schema type: #{writers_schema.type}"
  end
end

#skip_enum(writers_schema, decoder) ⇒ Object



471
472
473
# File 'lib/avro/io.rb', line 471

def skip_enum(writers_schema, decoder)
  decoder.skip_int
end

#skip_fixed(writers_schema, decoder) ⇒ Object



467
468
469
# File 'lib/avro/io.rb', line 467

def skip_fixed(writers_schema, decoder)
  decoder.skip(writers_schema.size)
end

#skip_map(writers_schema, decoder) ⇒ Object



484
485
486
487
488
489
# File 'lib/avro/io.rb', line 484

def skip_map(writers_schema, decoder)
  skip_blocks(decoder) {
    decoder.skip_string
    skip_data(writers_schema.values, decoder)
  }
end

#skip_record(writers_schema, decoder) ⇒ Object



491
492
493
# File 'lib/avro/io.rb', line 491

def skip_record(writers_schema, decoder)
  writers_schema.fields.each{|f| skip_data(f.type, decoder) }
end

#skip_union(writers_schema, decoder) ⇒ Object



475
476
477
478
# File 'lib/avro/io.rb', line 475

def skip_union(writers_schema, decoder)
  index = decoder.read_long
  skip_data(writers_schema.schemas[index], decoder)
end