Class: LogStash::Codecs::AvroSchemaRegistry
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::AvroSchemaRegistry
- Defined in:
- lib/logstash/codecs/avro_schema_registry.rb
Overview
Read serialized Avro records as Logstash events
This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.
Encoding
This codec currently does not encode. This might be added later.
Decoding
This codec is for deserializing individual Avro records. It looks up the associated avro schema from a Confluent schema registry. (github.com/confluentinc/schema-registry)
Usage
Example usage with Kafka input.
- source,ruby
input {
kafka { codec => avro_schema_registry { endpoint => "http://schemas.example.com" } }
} filter
...
output
...
Instance Method Summary collapse
Instance Method Details
#decode(data) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 71 def decode(data) if data.length < 5 @logger.error('message is too small to decode') else datum = StringIO.new(data) magic_byte, schema_id = datum.read(5).unpack("cI>") if magic_byte != MAGIC_BYTE @logger.error('message does not start with magic byte') else schema = get_schema(schema_id) decoder = Avro::IO::BinaryDecoder.new(datum) datum_reader = Avro::IO::DatumReader.new(schema) yield LogStash::Event.new(datum_reader.read(decoder)) end end end |
#encode(event) ⇒ Object
89 90 91 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 89 def encode(event) @logger.error('Encode has not been implemented for this codec') end |
#get_schema(schema_id) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 63 def get_schema(schema_id) if !@schemas.has_key?(schema_id) @schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id)) end @schemas[schema_id] end |
#register ⇒ Object
58 59 60 61 |
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 58 def register @client = SchemaRegistry::Client.new(endpoint, username, password) @schemas = Hash.new end |