Class: Avro::IO::DatumReader
- Inherits:
-
Object
- Object
- Avro::IO::DatumReader
- Defined in:
- lib/avro/io.rb
Instance Attribute Summary collapse
-
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
-
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
constructor
A new instance of DatumReader.
- #read(decoder) ⇒ Object
- #read_array(writers_schema, readers_schema, decoder) ⇒ Object
- #read_data(writers_schema, readers_schema, decoder) ⇒ Object
- #read_default_value(field_schema, default_value) ⇒ Object
- #read_enum(writers_schema, readers_schema, decoder) ⇒ Object
- #read_fixed(writers_schema, _readers_schema, decoder) ⇒ Object
- #read_map(writers_schema, readers_schema, decoder) ⇒ Object
- #read_record(writers_schema, readers_schema, decoder) ⇒ Object
- #read_union(writers_schema, readers_schema, decoder) ⇒ Object
- #skip_array(writers_schema, decoder) ⇒ Object
- #skip_data(writers_schema, decoder) ⇒ Object
- #skip_enum(_writers_schema, decoder) ⇒ Object
- #skip_fixed(writers_schema, decoder) ⇒ Object
- #skip_map(writers_schema, decoder) ⇒ Object
- #skip_record(writers_schema, decoder) ⇒ Object
- #skip_union(writers_schema, decoder) ⇒ Object
Constructor Details
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
Returns a new instance of DatumReader.
246 247 248 249 |
# File 'lib/avro/io.rb', line 246 def initialize(writers_schema=nil, readers_schema=nil) @writers_schema = writers_schema @readers_schema = readers_schema end |
Instance Attribute Details
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
244 245 246 |
# File 'lib/avro/io.rb', line 244 def readers_schema @readers_schema end |
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
244 245 246 |
# File 'lib/avro/io.rb', line 244 def writers_schema @writers_schema end |
Class Method Details
.match_schemas(writers_schema, readers_schema) ⇒ Object
240 241 242 |
# File 'lib/avro/io.rb', line 240 def self.match_schemas(writers_schema, readers_schema) Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) end |
Instance Method Details
#read(decoder) ⇒ Object
251 252 253 254 |
# File 'lib/avro/io.rb', line 251 def read(decoder) self.readers_schema = writers_schema unless readers_schema read_data(writers_schema, readers_schema, decoder) end |
#read_array(writers_schema, readers_schema, decoder) ⇒ Object
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/avro/io.rb', line 313 def read_array(writers_schema, readers_schema, decoder) read_items = [] block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count _block_size = decoder.read_long end block_count.times do read_items << read_data(writers_schema.items, readers_schema.items, decoder) end block_count = decoder.read_long end read_items end |
#read_data(writers_schema, readers_schema, decoder) ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/avro/io.rb', line 256 def read_data(writers_schema, readers_schema, decoder) # schema matching unless self.class.match_schemas(writers_schema, readers_schema) raise SchemaMatchException.new(writers_schema, readers_schema) end # schema resolution: reader's schema is a union, writer's # schema is not if writers_schema.type_sym != :union && readers_schema.type_sym == :union rs = readers_schema.schemas.find{|s| self.class.match_schemas(writers_schema, s) } return read_data(writers_schema, rs, decoder) if rs raise SchemaMatchException.new(writers_schema, readers_schema) end # function dispatch for reading data based on type of writer's # schema datum = case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string when :int; decoder.read_int when :long; decoder.read_long when :float; decoder.read_float when :double; decoder.read_double when :bytes; decoder.read_bytes when :fixed; read_fixed(writers_schema, readers_schema, decoder) when :enum; read_enum(writers_schema, readers_schema, decoder) when :array; read_array(writers_schema, readers_schema, decoder) when :map; read_map(writers_schema, readers_schema, decoder) when :union; read_union(writers_schema, readers_schema, decoder) when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) else raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" end readers_schema.type_adapter.decode(datum) end |
#read_default_value(field_schema, default_value) ⇒ Object
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/avro/io.rb', line 391 def read_default_value(field_schema, default_value) # Basically a JSON Decoder? datum = case field_schema.type_sym when :null nil when :int, :long Integer(default_value) when :float, :double Float(default_value) when :boolean, :enum, :fixed, :string, :bytes default_value when :array read_array = [] default_value.each do |json_val| item_val = read_default_value(field_schema.items, json_val) read_array << item_val end read_array when :map read_map = {} default_value.each do |key, json_val| map_val = read_default_value(field_schema.values, json_val) read_map[key] = map_val end read_map when :union read_default_value(field_schema.schemas[0], default_value) when :record, :error read_record = {} field_schema.fields.each do |field| json_val = default_value[field.name] json_val = field.default unless json_val field_val = read_default_value(field.type, json_val) read_record[field.name] = field_val end read_record else fail_msg = "Unknown type: #{field_schema.type}" raise AvroError, fail_msg end field_schema.type_adapter.decode(datum) end |
#read_enum(writers_schema, readers_schema, decoder) ⇒ Object
300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/avro/io.rb', line 300 def read_enum(writers_schema, readers_schema, decoder) index_of_symbol = decoder.read_int read_symbol = writers_schema.symbols[index_of_symbol] if !readers_schema.symbols.include?(read_symbol) && readers_schema.default read_symbol = readers_schema.default end # This implementation deviates from the spec by always returning # a symbol. read_symbol end |
#read_fixed(writers_schema, _readers_schema, decoder) ⇒ Object
296 297 298 |
# File 'lib/avro/io.rb', line 296 def read_fixed(writers_schema, _readers_schema, decoder) decoder.read(writers_schema.size) end |
#read_map(writers_schema, readers_schema, decoder) ⇒ Object
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
# File 'lib/avro/io.rb', line 332 def read_map(writers_schema, readers_schema, decoder) read_items = {} block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count _block_size = decoder.read_long end block_count.times do key = decoder.read_string read_items[key] = read_data(writers_schema.values, readers_schema.values, decoder) end block_count = decoder.read_long end read_items end |
#read_record(writers_schema, readers_schema, decoder) ⇒ Object
359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/avro/io.rb', line 359 def read_record(writers_schema, readers_schema, decoder) readers_fields_hash = readers_schema.fields_hash read_record = {} writers_schema.fields.each do |field| readers_field = readers_fields_hash[field.name] if readers_field field_val = read_data(field.type, readers_field.type, decoder) read_record[field.name] = field_val elsif readers_schema.fields_by_alias.key?(field.name) readers_field = readers_schema.fields_by_alias[field.name] field_val = read_data(field.type, readers_field.type, decoder) read_record[readers_field.name] = field_val else skip_data(field.type, decoder) end end # fill in the default values readers_fields_hash.each do |field_name, field| next if read_record.key?(field_name) if field.default? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else raise AvroError, "Missing data for #{field.type} with no default" end end read_record end |
#read_union(writers_schema, readers_schema, decoder) ⇒ Object
352 353 354 355 356 357 |
# File 'lib/avro/io.rb', line 352 def read_union(writers_schema, readers_schema, decoder) index_of_schema = decoder.read_long selected_writers_schema = writers_schema.schemas[index_of_schema] read_data(selected_writers_schema, readers_schema, decoder) end |
#skip_array(writers_schema, decoder) ⇒ Object
483 484 485 |
# File 'lib/avro/io.rb', line 483 def skip_array(writers_schema, decoder) skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } end |
#skip_data(writers_schema, decoder) ⇒ Object
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'lib/avro/io.rb', line 435 def skip_data(writers_schema, decoder) case writers_schema.type_sym when :null decoder.skip_null when :boolean decoder.skip_boolean when :string decoder.skip_string when :int decoder.skip_int when :long decoder.skip_long when :float decoder.skip_float when :double decoder.skip_double when :bytes decoder.skip_bytes when :fixed skip_fixed(writers_schema, decoder) when :enum skip_enum(writers_schema, decoder) when :array skip_array(writers_schema, decoder) when :map skip_map(writers_schema, decoder) when :union skip_union(writers_schema, decoder) when :record, :error, :request skip_record(writers_schema, decoder) else raise AvroError, "Unknown schema type: #{writers_schema.type}" end end |
#skip_enum(_writers_schema, decoder) ⇒ Object
474 475 476 |
# File 'lib/avro/io.rb', line 474 def skip_enum(_writers_schema, decoder) decoder.skip_int end |
#skip_fixed(writers_schema, decoder) ⇒ Object
470 471 472 |
# File 'lib/avro/io.rb', line 470 def skip_fixed(writers_schema, decoder) decoder.skip(writers_schema.size) end |
#skip_map(writers_schema, decoder) ⇒ Object
487 488 489 490 491 492 |
# File 'lib/avro/io.rb', line 487 def skip_map(writers_schema, decoder) skip_blocks(decoder) { decoder.skip_string skip_data(writers_schema.values, decoder) } end |
#skip_record(writers_schema, decoder) ⇒ Object
494 495 496 |
# File 'lib/avro/io.rb', line 494 def skip_record(writers_schema, decoder) writers_schema.fields.each{|f| skip_data(f.type, decoder) } end |
#skip_union(writers_schema, decoder) ⇒ Object
478 479 480 481 |
# File 'lib/avro/io.rb', line 478 def skip_union(writers_schema, decoder) index = decoder.read_long skip_data(writers_schema.schemas[index], decoder) end |