Class: LogStash::Codecs::AvroSchemaRegistry

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/avro_schema_registry.rb

Overview

Read serialized Avro records as Logstash events

This plugin is used to serialize Logstash events as Avro datums, as well as deserializing Avro datums into Logstash events.

Encoding

This codec currently does not encode. This might be added later.

Decoding

This codec is for deserializing individual Avro records. It looks up the associated avro schema from a Confluent schema registry. (github.com/confluentinc/schema-registry)

Usage

Example usage with Kafka input.

source,ruby

input {

kafka {
  codec => avro_schema_registry {
    endpoint => "http://schemas.example.com"
  }
}

} filter

...

output

...


Instance Method Summary collapse

Instance Method Details

#decode(data) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 71

def decode(data)
  if data.length < 5
    @logger.error('message is too small to decode')
  else
    datum = StringIO.new(data)
    magic_byte, schema_id = datum.read(5).unpack("cI>")
    if magic_byte != MAGIC_BYTE
      @logger.error('message does not start with magic byte')
    else
      schema = get_schema(schema_id)
      decoder = Avro::IO::BinaryDecoder.new(datum)
      datum_reader = Avro::IO::DatumReader.new(schema)
      yield LogStash::Event.new(datum_reader.read(decoder))
    end
  end
end

#encode(event) ⇒ Object



89
90
91
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 89

def encode(event)
  @logger.error('Encode has not been implemented for this codec')
end

#get_schema(schema_id) ⇒ Object



63
64
65
66
67
68
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 63

def get_schema(schema_id)
  if !@schemas.has_key?(schema_id)
    @schemas[schema_id] = Avro::Schema.parse(@client.schema(schema_id))
  end
  @schemas[schema_id]
end

#registerObject



58
59
60
61
# File 'lib/logstash/codecs/avro_schema_registry.rb', line 58

def register
  @client = SchemaRegistry::Client.new(endpoint, username, password)
  @schemas = Hash.new
end