Class: Avro::IO::DatumReader
- Inherits:
-
Object
- Object
- Avro::IO::DatumReader
- Defined in:
- lib/avro/io.rb
Instance Attribute Summary collapse
-
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
-
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
constructor
A new instance of DatumReader.
- #read(decoder) ⇒ Object
- #read_array(writers_schema, readers_schema, decoder) ⇒ Object
- #read_data(writers_schema, readers_schema, decoder) ⇒ Object
- #read_default_value(field_schema, default_value) ⇒ Object
- #read_enum(writers_schema, readers_schema, decoder) ⇒ Object
- #read_fixed(writers_schema, readers_schema, decoder) ⇒ Object
- #read_map(writers_schema, readers_schema, decoder) ⇒ Object
- #read_record(writers_schema, readers_schema, decoder) ⇒ Object
- #read_union(writers_schema, readers_schema, decoder) ⇒ Object
- #skip_array(writers_schema, decoder) ⇒ Object
- #skip_data(writers_schema, decoder) ⇒ Object
- #skip_enum(writers_schema, decoder) ⇒ Object
- #skip_fixed(writers_schema, decoder) ⇒ Object
- #skip_map(writers_schema, decoder) ⇒ Object
- #skip_record(writers_schema, decoder) ⇒ Object
- #skip_union(writers_schema, decoder) ⇒ Object
Constructor Details
#initialize(writers_schema = nil, readers_schema = nil) ⇒ DatumReader
Returns a new instance of DatumReader.
268 269 270 271 |
# File 'lib/avro/io.rb', line 268 def initialize(writers_schema=nil, readers_schema=nil) @writers_schema = writers_schema @readers_schema = readers_schema end |
Instance Attribute Details
#readers_schema ⇒ Object
Returns the value of attribute readers_schema.
266 267 268 |
# File 'lib/avro/io.rb', line 266 def readers_schema @readers_schema end |
#writers_schema ⇒ Object
Returns the value of attribute writers_schema.
266 267 268 |
# File 'lib/avro/io.rb', line 266 def writers_schema @writers_schema end |
Class Method Details
.match_schemas(writers_schema, readers_schema) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/avro/io.rb', line 223 def self.match_schemas(writers_schema, readers_schema) w_type = writers_schema.type_sym r_type = readers_schema.type_sym # This conditional is begging for some OO love. if w_type == :union || r_type == :union return true end if w_type == r_type return true if Schema::PRIMITIVE_TYPES_SYM.include?(r_type) case r_type when :record return writers_schema.fullname == readers_schema.fullname when :error return writers_schema.fullname == readers_schema.fullname when :request return true when :fixed return writers_schema.fullname == readers_schema.fullname && writers_schema.size == readers_schema.size when :enum return writers_schema.fullname == readers_schema.fullname when :map return writers_schema.values.type == readers_schema.values.type when :array return writers_schema.items.type == readers_schema.items.type end end # Handle schema promotion if w_type == :int && [:long, :float, :double].include?(r_type) return true elsif w_type == :long && [:float, :double].include?(r_type) return true elsif w_type == :float && r_type == :double return true end return false end |
Instance Method Details
#read(decoder) ⇒ Object
273 274 275 276 |
# File 'lib/avro/io.rb', line 273 def read(decoder) self.readers_schema = writers_schema unless readers_schema read_data(writers_schema, readers_schema, decoder) end |
#read_array(writers_schema, readers_schema, decoder) ⇒ Object
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
# File 'lib/avro/io.rb', line 335 def read_array(writers_schema, readers_schema, decoder) read_items = [] block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count block_size = decoder.read_long end block_count.times do read_items << read_data(writers_schema.items, readers_schema.items, decoder) end block_count = decoder.read_long end read_items end |
#read_data(writers_schema, readers_schema, decoder) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/avro/io.rb', line 278 def read_data(writers_schema, readers_schema, decoder) # schema matching unless self.class.match_schemas(writers_schema, readers_schema) raise SchemaMatchException.new(writers_schema, readers_schema) end # schema resolution: reader's schema is a union, writer's # schema is not if writers_schema.type_sym != :union && readers_schema.type_sym == :union rs = readers_schema.schemas.find{|s| self.class.match_schemas(writers_schema, s) } return read_data(writers_schema, rs, decoder) if rs raise SchemaMatchException.new(writers_schema, readers_schema) end # function dispatch for reading data based on type of writer's # schema datum = case writers_schema.type_sym when :null; decoder.read_null when :boolean; decoder.read_boolean when :string; decoder.read_string when :int; decoder.read_int when :long; decoder.read_long when :float; decoder.read_float when :double; decoder.read_double when :bytes; decoder.read_bytes when :fixed; read_fixed(writers_schema, readers_schema, decoder) when :enum; read_enum(writers_schema, readers_schema, decoder) when :array; read_array(writers_schema, readers_schema, decoder) when :map; read_map(writers_schema, readers_schema, decoder) when :union; read_union(writers_schema, readers_schema, decoder) when :record, :error, :request; read_record(writers_schema, readers_schema, decoder) else raise AvroError, "Cannot read unknown schema type: #{writers_schema.type}" end readers_schema.type_adapter.decode(datum) end |
#read_default_value(field_schema, default_value) ⇒ Object
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 |
# File 'lib/avro/io.rb', line 411 def read_default_value(field_schema, default_value) # Basically a JSON Decoder? case field_schema.type_sym when :null return nil when :boolean return default_value when :int, :long return Integer(default_value) when :float, :double return Float(default_value) when :enum, :fixed, :string, :bytes return default_value when :array read_array = [] default_value.each do |json_val| item_val = read_default_value(field_schema.items, json_val) read_array << item_val end return read_array when :map read_map = {} default_value.each do |key, json_val| map_val = read_default_value(field_schema.values, json_val) read_map[key] = map_val end return read_map when :union return read_default_value(field_schema.schemas[0], default_value) when :record, :error read_record = {} field_schema.fields.each do |field| json_val = default_value[field.name] json_val = field.default unless json_val field_val = read_default_value(field.type, json_val) read_record[field.name] = field_val end return read_record else fail_msg = "Unknown type: #{field_schema.type}" raise AvroError, fail_msg end end |
#read_enum(writers_schema, readers_schema, decoder) ⇒ Object
322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/avro/io.rb', line 322 def read_enum(writers_schema, readers_schema, decoder) index_of_symbol = decoder.read_int read_symbol = writers_schema.symbols[index_of_symbol] # TODO(jmhodges): figure out what unset means for resolution # schema resolution unless readers_schema.symbols.include?(read_symbol) # 'unset' here end read_symbol end |
#read_fixed(writers_schema, readers_schema, decoder) ⇒ Object
318 319 320 |
# File 'lib/avro/io.rb', line 318 def read_fixed(writers_schema, readers_schema, decoder) decoder.read(writers_schema.size) end |
#read_map(writers_schema, readers_schema, decoder) ⇒ Object
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/avro/io.rb', line 354 def read_map(writers_schema, readers_schema, decoder) read_items = {} block_count = decoder.read_long while block_count != 0 if block_count < 0 block_count = -block_count block_size = decoder.read_long end block_count.times do key = decoder.read_string read_items[key] = read_data(writers_schema.values, readers_schema.values, decoder) end block_count = decoder.read_long end read_items end |
#read_record(writers_schema, readers_schema, decoder) ⇒ Object
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/avro/io.rb', line 381 def read_record(writers_schema, readers_schema, decoder) readers_fields_hash = readers_schema.fields_hash read_record = {} writers_schema.fields.each do |field| if readers_field = readers_fields_hash[field.name] field_val = read_data(field.type, readers_field.type, decoder) read_record[field.name] = field_val else skip_data(field.type, decoder) end end # fill in the default values if readers_fields_hash.size > read_record.size writers_fields_hash = writers_schema.fields_hash readers_fields_hash.each do |field_name, field| unless writers_fields_hash.has_key? field_name if !field.default.nil? field_val = read_default_value(field.type, field.default) read_record[field.name] = field_val else # FIXME(jmhodges) another 'unset' here end end end end read_record end |
#read_union(writers_schema, readers_schema, decoder) ⇒ Object
374 375 376 377 378 379 |
# File 'lib/avro/io.rb', line 374 def read_union(writers_schema, readers_schema, decoder) index_of_schema = decoder.read_long selected_writers_schema = writers_schema.schemas[index_of_schema] read_data(selected_writers_schema, readers_schema, decoder) end |
#skip_array(writers_schema, decoder) ⇒ Object
503 504 505 |
# File 'lib/avro/io.rb', line 503 def skip_array(writers_schema, decoder) skip_blocks(decoder) { skip_data(writers_schema.items, decoder) } end |
#skip_data(writers_schema, decoder) ⇒ Object
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/avro/io.rb', line 455 def skip_data(writers_schema, decoder) case writers_schema.type_sym when :null decoder.skip_null when :boolean decoder.skip_boolean when :string decoder.skip_string when :int decoder.skip_int when :long decoder.skip_long when :float decoder.skip_float when :double decoder.skip_double when :bytes decoder.skip_bytes when :fixed skip_fixed(writers_schema, decoder) when :enum skip_enum(writers_schema, decoder) when :array skip_array(writers_schema, decoder) when :map skip_map(writers_schema, decoder) when :union skip_union(writers_schema, decoder) when :record, :error, :request skip_record(writers_schema, decoder) else raise AvroError, "Unknown schema type: #{writers_schema.type}" end end |
#skip_enum(writers_schema, decoder) ⇒ Object
494 495 496 |
# File 'lib/avro/io.rb', line 494 def skip_enum(writers_schema, decoder) decoder.skip_int end |
#skip_fixed(writers_schema, decoder) ⇒ Object
490 491 492 |
# File 'lib/avro/io.rb', line 490 def skip_fixed(writers_schema, decoder) decoder.skip(writers_schema.size) end |
#skip_map(writers_schema, decoder) ⇒ Object
507 508 509 510 511 512 |
# File 'lib/avro/io.rb', line 507 def skip_map(writers_schema, decoder) skip_blocks(decoder) { decoder.skip_string skip_data(writers_schema.values, decoder) } end |
#skip_record(writers_schema, decoder) ⇒ Object
514 515 516 |
# File 'lib/avro/io.rb', line 514 def skip_record(writers_schema, decoder) writers_schema.fields.each{|f| skip_data(f.type, decoder) } end |
#skip_union(writers_schema, decoder) ⇒ Object
498 499 500 501 |
# File 'lib/avro/io.rb', line 498 def skip_union(writers_schema, decoder) index = decoder.read_long skip_data(writers_schema.schemas[index], decoder) end |