Class: Avro::IO::DatumReader
- Inherits:
-
Object
- Object
- Avro::IO::DatumReader
- Defined in:
- lib/avro/io.rb
Instance Attribute Summary collapse
-
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
-
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
constructor
A new instance of DatumReader.
- #read(decoder) ⇒ Object
- #read_array(writers_schema, readers_schema, decoder) ⇒ Object
- #read_data(writers_schema, readers_schema, decoder) ⇒ Object
- #read_default_value(field_schema, default_value) ⇒ Object
- #read_enum(writers_schema, readers_schema, decoder) ⇒ Object
- #read_fixed(writers_schema, readers_schema, decoder) ⇒ Object
- #read_map(writers_schema, readers_schema, decoder) ⇒ Object
- #read_record(writers_schema, readers_schema, decoder) ⇒ Object
- #read_union(writers_schema, readers_schema, decoder) ⇒ Object
- #skip_array(writers_schema, decoder) ⇒ Object
- #skip_data(writers_schema, decoder) ⇒ Object
- #skip_enum(writers_schema, decoder) ⇒ Object
- #skip_fixed(writers_schema, decoder) ⇒ Object
- #skip_map(writers_schema, decoder) ⇒ Object
- #skip_record(writers_schema, decoder) ⇒ Object
- #skip_union(writers_schema, decoder) ⇒ Object
Constructor Details
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
Returns a new instance of DatumReader.
245 246 247 248 |
# File 'lib/avro/io.rb', line 245 def initialize(writers_schema=nil, readers_schema=nil) @writers_schema = writers_schema @readers_schema = readers_schema end |
Instance Attribute Details
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
243 244 245 |
# File 'lib/avro/io.rb', line 243 def readers_schema @readers_schema end |
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
243 244 245 |
# File 'lib/avro/io.rb', line 243 def writers_schema @writers_schema end |
Class Method Details
.match_schemas(writers_schema, readers_schema) ⇒ Object
239 240 241 |
# File 'lib/avro/io.rb', line 239 def self.match_schemas(writers_schema, readers_schema) Avro::SchemaCompatibility.match_schemas(writers_schema, readers_schema) end |
Instance Method Details
#read(decoder) ⇒ Object
250 251 252 253 |
# File 'lib/avro/io.rb', line 250 def read(decoder) self.readers_schema = writers_schema unless readers_schema read_data(writers_schema, readers_schema, decoder) end |
#read_array(writers_schema, readers_schema, decoder) ⇒ Object
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/avro/io.rb', line 312 def read_array(writers_schema, readers_schema, decoder) read_items = [] block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count _block_size = decoder.read_long end block_count.times do read_items << read_data(writers_schema.items, readers_schema.items, decoder) end block_count = decoder.read_long end read_items end |
#read_data(writers_schema, readers_schema, decoder) ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/avro/io.rb', line 255 def read_data(writers_schema, readers_schema, decoder) # schema matching unless self.class.match_schemas(writers_schema, readers_schema) raise SchemaMatchException.new(writers_schema, readers_schema) end # schema resolution: reader's schema is a union, writer's # schema is not if writers_schema.type_sym != :union && readers_schema.type_sym == :union rs = readers_schema.schemas.find{|s| self.class.match_schemas(writers_schema, s) } return read_data(writers_schema, rs, decoder) if rs raise SchemaMatchException.new(writers_schema, readers_schema) end # function dispatch for reading data based on type of writer's # schema datum = case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string when :int; decoder.read_int when :long; decoder.read_long when :float; decoder.read_float when :double; decoder.read_double when :bytes; decoder.read_bytes when :fixed; read_fixed(writers_schema, readers_schema, decoder) when :enum; read_enum(writers_schema, readers_schema, decoder) when :array; read_array(writers_schema, readers_schema, decoder) when :map; read_map(writers_schema, readers_schema, decoder) when :union; read_union(writers_schema, readers_schema, decoder) when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) else raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" end readers_schema.type_adapter.decode(datum) end |
#read_default_value(field_schema, default_value) ⇒ Object
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/avro/io.rb', line 388 def read_default_value(field_schema, default_value) # Basically a JSON Decoder? case field_schema.type_sym when :null return nil when :boolean return default_value when :int, :long return Integer(default_value) when :float, :double return Float(default_value) when :enum, :fixed, :string, :bytes return default_value when :array read_array = [] default_value.each do |json_val| item_val = read_default_value(field_schema.items, json_val) read_array << item_val end return read_array when :map read_map = {} default_value.each do |key, json_val| map_val = read_default_value(field_schema.values, json_val) read_map[key] = map_val end return read_map when :union return read_default_value(field_schema.schemas[0], default_value) when :record, :error read_record = {} field_schema.fields.each do |field| json_val = default_value[field.name] json_val = field.default unless json_val field_val = read_default_value(field.type, json_val) read_record[field.name] = field_val end return read_record else fail_msg = "Unknown type: #{field_schema.type}" raise AvroError, fail_msg end end |
#read_enum(writers_schema, readers_schema, decoder) ⇒ Object
299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/avro/io.rb', line 299 def read_enum(writers_schema, readers_schema, decoder) index_of_symbol = decoder.read_int read_symbol = writers_schema.symbols[index_of_symbol] # TODO(jmhodges): figure out what unset means for resolution # schema resolution unless readers_schema.symbols.include?(read_symbol) # 'unset' here end read_symbol end |
#read_fixed(writers_schema, readers_schema, decoder) ⇒ Object
295 296 297 |
# File 'lib/avro/io.rb', line 295 def read_fixed(writers_schema, readers_schema, decoder) decoder.read(writers_schema.size) end |
#read_map(writers_schema, readers_schema, decoder) ⇒ Object
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/avro/io.rb', line 331 def read_map(writers_schema, readers_schema, decoder) read_items = {} block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count _block_size = decoder.read_long end block_count.times do key = decoder.read_string read_items[key] = read_data(writers_schema.values, readers_schema.values, decoder) end block_count = decoder.read_long end read_items end |
#read_record(writers_schema, readers_schema, decoder) ⇒ Object
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 |
# File 'lib/avro/io.rb', line 358 def read_record(writers_schema, readers_schema, decoder) readers_fields_hash = readers_schema.fields_hash read_record = {} writers_schema.fields.each do |field| if readers_field = readers_fields_hash[field.name] field_val = read_data(field.type, readers_field.type, decoder) read_record[field.name] = field_val else skip_data(field.type, decoder) end end # fill in the default values if readers_fields_hash.size > read_record.size writers_fields_hash = writers_schema.fields_hash readers_fields_hash.each do |field_name, field| unless writers_fields_hash.has_key? field_name if field.default? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else raise AvroError, "Missing data for #{field.type} with no default" end end end end read_record end |
#read_union(writers_schema, readers_schema, decoder) ⇒ Object
351 352 353 354 355 356 |
# File 'lib/avro/io.rb', line 351 def read_union(writers_schema, readers_schema, decoder) index_of_schema = decoder.read_long selected_writers_schema = writers_schema.schemas[index_of_schema] read_data(selected_writers_schema, readers_schema, decoder) end |
#skip_array(writers_schema, decoder) ⇒ Object
480 481 482 |
# File 'lib/avro/io.rb', line 480 def skip_array(writers_schema, decoder) skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } end |
#skip_data(writers_schema, decoder) ⇒ Object
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 |
# File 'lib/avro/io.rb', line 432 def skip_data(writers_schema, decoder) case writers_schema.type_sym when :null decoder.skip_null when :boolean decoder.skip_boolean when :string decoder.skip_string when :int decoder.skip_int when :long decoder.skip_long when :float decoder.skip_float when :double decoder.skip_double when :bytes decoder.skip_bytes when :fixed skip_fixed(writers_schema, decoder) when :enum skip_enum(writers_schema, decoder) when :array skip_array(writers_schema, decoder) when :map skip_map(writers_schema, decoder) when :union skip_union(writers_schema, decoder) when :record, :error, :request skip_record(writers_schema, decoder) else raise AvroError, "Unknown schema type: #{writers_schema.type}" end end |
#skip_enum(writers_schema, decoder) ⇒ Object
471 472 473 |
# File 'lib/avro/io.rb', line 471 def skip_enum(writers_schema, decoder) decoder.skip_int end |
#skip_fixed(writers_schema, decoder) ⇒ Object
467 468 469 |
# File 'lib/avro/io.rb', line 467 def skip_fixed(writers_schema, decoder) decoder.skip(writers_schema.size) end |
#skip_map(writers_schema, decoder) ⇒ Object
484 485 486 487 488 489 |
# File 'lib/avro/io.rb', line 484 def skip_map(writers_schema, decoder) skip_blocks(decoder) { decoder.skip_string skip_data(writers_schema.values, decoder) } end |
#skip_record(writers_schema, decoder) ⇒ Object
491 492 493 |
# File 'lib/avro/io.rb', line 491 def skip_record(writers_schema, decoder) writers_schema.fields.each{|f| skip_data(f.type, decoder) } end |
#skip_union(writers_schema, decoder) ⇒ Object
475 476 477 478 |
# File 'lib/avro/io.rb', line 475 def skip_union(writers_schema, decoder) index = decoder.read_long skip_data(writers_schema.schemas[index], decoder) end |