Class: Avromatic::Messaging
- Inherits:
-
AvroTurf::Messaging
- Object
- AvroTurf::Messaging
- Avromatic::Messaging
- Defined in:
- lib/avromatic/messaging.rb
Overview
Subclass AvroTurf::Messaging to use a custom DatumReader and DatumWriter
Instance Attribute Summary collapse
-
#registry ⇒ Object
readonly
Returns the value of attribute registry.
Instance Method Summary collapse
- #decode(data, schema_name: nil, namespace: @namespace) ⇒ Object
- #encode(message, schema_name: nil, namespace: @namespace, subject: nil) ⇒ Object
Instance Attribute Details
#registry ⇒ Object (readonly)
Returns the value of attribute registry.
9 10 11 |
# File 'lib/avromatic/messaging.rb', line 9 def registry @registry end |
Instance Method Details
#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/avromatic/messaging.rb', line 11 def decode(data, schema_name: nil, namespace: @namespace) readers_schema = schema_name && @schema_store.find(schema_name, namespace) stream = StringIO.new(data) decoder = Avro::IO::BinaryDecoder.new(stream) # The first byte is MAGIC!!! magic_byte = decoder.read(1) raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`" if magic_byte != MAGIC_BYTE # The schema id is a 4-byte big-endian integer. schema_id = decoder.read(4).unpack1('N') writers_schema = @schemas_by_id.fetch(schema_id) do schema_json = @registry.fetch(schema_id) @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json) end # The following line differs from the parent class to use a custom DatumReader reader_class = Avromatic.use_custom_datum_reader ? Avromatic::IO::DatumReader : Avro::IO::DatumReader reader = reader_class.new(writers_schema, readers_schema) reader.read(decoder) end |
#encode(message, schema_name: nil, namespace: @namespace, subject: nil) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/avromatic/messaging.rb', line 35 def encode(, schema_name: nil, namespace: @namespace, subject: nil) schema = @schema_store.find(schema_name, namespace) # Schemas are registered under the full name of the top level Avro record # type, or `subject` if it's provided. schema_id = @registry.register(subject || schema.fullname, schema) stream = StringIO.new encoder = Avro::IO::BinaryEncoder.new(stream) # Always start with the magic byte. encoder.write(MAGIC_BYTE) # The schema id is encoded as a 4-byte big-endian integer. encoder.write([schema_id].pack('N')) # The following line differs from the parent class to use a custom DatumWriter writer_class = Avromatic.use_custom_datum_writer ? Avromatic::IO::DatumWriter : Avro::IO::DatumWriter writer = writer_class.new(schema) # The actual message comes last. writer.write(, encoder) stream.string end |