Class: AvroTurf::Messaging
- Inherits:
-
Object
- Object
- AvroTurf::Messaging
- Defined in:
- lib/avro_turf/messaging.rb
Overview
Provides a way to encode and decode messages without having to embed schemas in the encoded data. Confluent’s Schema Registry is used to register a schema when encoding a message – the registry will issue a schema id that will be included in the encoded data alongside the actual message. When decoding the data, the schema id will be used to look up the writer’s schema from the registry.
Defined Under Namespace
Classes: DecodedMessage
Constant Summary collapse
- MAGIC_BYTE =
[0].pack("C").freeze
Instance Method Summary collapse
-
#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object
Decodes data into the original message.
-
#decode_message(data, schema_name: nil, namespace: @namespace) ⇒ Object
Decodes data into the original message.
-
#encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false, register_schemas: true) ⇒ Object
Encodes a message using the specified schema.
-
#fetch_schema(subject:, version: 'latest') ⇒ Object
Providing subject and version to determine the schema, which skips the auto registeration of schema on the schema registry.
- #fetch_schema_by_body(schema_name:, subject: nil, namespace: nil) ⇒ Object
-
#fetch_schema_by_id(schema_id) ⇒ Object
Fetch the schema from registry with the provided schema_id.
-
#initialize(registry: nil, registry_url: nil, schema_context: nil, schema_store: nil, schemas_path: nil, namespace: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil) ⇒ Messaging
constructor
Instantiate a new Messaging instance with the given configuration.
-
#register_schema(schema_name:, subject: nil, namespace: nil) ⇒ Object
Schemas are registered under the full name of the top level Avro record type, or ‘subject` if it’s provided.
Constructor Details
#initialize(registry: nil, registry_url: nil, schema_context: nil, schema_store: nil, schemas_path: nil, namespace: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil) ⇒ Messaging
Instantiate a new Messaging instance with the given configuration.
registry - A schema registry object that responds to all methods in the
AvroTurf::ConfluentSchemaRegistry interface.
registry_url - The String URL of the schema registry that should be used. schema_context - Schema registry context name (optional) schema_store - A schema store object that responds to #find(schema_name, namespace). schemas_path - The String file system path where local schemas are stored. namespace - The String default schema namespace. registry_path_prefix - The String URL path prefix used to namespace schema registry requests (optional). logger - The Logger that should be used to log information (optional). proxy - Forward the request via proxy (optional). user - User for basic auth (optional). password - Password for basic auth (optional). ssl_ca_file - Name of file containing CA certificate (optional). client_cert - Name of file containing client certificate (optional). client_key - Name of file containing client private key to go with client_cert (optional). client_key_pass - Password to go with client_key (optional). client_cert_data - In-memory client certificate (optional). client_key_data - In-memory client private key to go with client_cert_data (optional). connect_timeout - Timeout to use in the connection with the schema registry (optional). resolv_resolver - Custom domain name resolver (optional).
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/avro_turf/messaging.rb', line 61 def initialize( registry: nil, registry_url: nil, schema_context: nil, schema_store: nil, schemas_path: nil, namespace: nil, registry_path_prefix: nil, logger: nil, proxy: nil, user: nil, password: nil, ssl_ca_file: nil, client_cert: nil, client_key: nil, client_key_pass: nil, client_cert_data: nil, client_key_data: nil, connect_timeout: nil, resolv_resolver: nil ) @logger = logger || Logger.new($stderr) @namespace = namespace @schema_store = schema_store || SchemaStore.new(path: schemas_path || DEFAULT_SCHEMAS_PATH) @registry = registry || CachedConfluentSchemaRegistry.new( ConfluentSchemaRegistry.new( registry_url, schema_context: schema_context, logger: @logger, proxy: proxy, user: user, password: password, ssl_ca_file: ssl_ca_file, client_cert: client_cert, client_key: client_key, client_key_pass: client_key_pass, client_cert_data: client_cert_data, client_key_data: client_key_data, path_prefix: registry_path_prefix, connect_timeout: connect_timeout, resolv_resolver: resolv_resolver ) ) @schemas_by_id = {} end |
Instance Method Details
#decode(data, schema_name: nil, namespace: @namespace) ⇒ Object
Decodes data into the original message.
data - A String containing encoded data. schema_name - The String name of the schema that should be used to decode
the data. Must match the schema used when encoding (optional).
namespace - The namespace of the schema (optional).
Returns the decoded message.
176 177 178 |
# File 'lib/avro_turf/messaging.rb', line 176 def decode(data, schema_name: nil, namespace: @namespace) (data, schema_name: schema_name, namespace: namespace). end |
#decode_message(data, schema_name: nil, namespace: @namespace) ⇒ Object
Decodes data into the original message.
data - A String containing encoded data. schema_name - The String name of the schema that should be used to decode
the data. Must match the schema used when encoding (optional).
namespace - The namespace of the schema (optional).
Returns Struct with the next attributes:
schema_id - The integer id of schema used to encode the message
message - The decoded message
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/avro_turf/messaging.rb', line 190 def (data, schema_name: nil, namespace: @namespace) readers_schema = schema_name && @schema_store.find(schema_name, namespace) stream = StringIO.new(data) decoder = Avro::IO::BinaryDecoder.new(stream) # The first byte is MAGIC!!! magic_byte = decoder.read(1) if magic_byte != MAGIC_BYTE raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`" end # The schema id is a 4-byte big-endian integer. schema_id = decoder.read(4).unpack("N").first writers_schema = @schemas_by_id.fetch(schema_id) do schema_json = @registry.fetch(schema_id) @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json) end reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) = reader.read(decoder) DecodedMessage.new(schema_id, writers_schema, readers_schema, ) rescue Excon::Error::NotFound raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry") end |
#encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false, register_schemas: true) ⇒ Object
Encodes a message using the specified schema.
message - The message that should be encoded. Must be compatible with
the schema.
schema_name - The String name of the schema that should be used to encode
the data.
namespace - The namespace of the schema (optional). subject - The subject name the schema should be registered under in
the schema registry (optional).
version - The integer version of the schema that should be used to decode
the data. Must match the schema used when encoding (optional).
schema_id - The integer id of the schema that should be used to encode
the data.
validate - The boolean for performing complete message validation before
encoding it, Avro::SchemaValidator::ValidationError with
a descriptive message will be raised in case of invalid message.
register_schemas - The boolean that indicates whether or not the schema should be
registered in case it does not exist, or if it should be fetched
from the registry without registering it (register_schemas: false).
Returns the encoded data as a String.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/avro_turf/messaging.rb', line 128 def encode(, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil, validate: false, register_schemas: true) schema, schema_id = if schema_id fetch_schema_by_id(schema_id) elsif subject && version fetch_schema(subject: subject, version: version) elsif schema_name && !register_schemas fetch_schema_by_body(subject: subject, schema_name: schema_name, namespace: namespace) elsif schema_name register_schema(subject: subject, schema_name: schema_name, namespace: namespace) else raise ArgumentError.new('Neither schema_name nor schema_id nor subject + version provided to determine the schema.') end if validate Avro::SchemaValidator.validate!(schema, , recursive: true, encoded: false, fail_on_extra_fields: true) end stream = StringIO.new writer = Avro::IO::DatumWriter.new(schema) encoder = Avro::IO::BinaryEncoder.new(stream) # Always start with the magic byte. encoder.write(MAGIC_BYTE) # The schema id is encoded as a 4-byte big-endian integer. encoder.write([schema_id].pack("N")) # The actual message comes last. writer.write(, encoder) stream.string rescue Excon::Error::NotFound if schema_id raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry") else raise SchemaNotFoundError.new("Schema with subject: `#{subject}` version: `#{version}` is not found on registry") end end |
#fetch_schema(subject:, version: 'latest') ⇒ Object
Providing subject and version to determine the schema, which skips the auto registeration of schema on the schema registry. Fetch the schema from registry with the provided subject name and version.
221 222 223 224 225 226 |
# File 'lib/avro_turf/messaging.rb', line 221 def fetch_schema(subject:, version: 'latest') schema_data = @registry.subject_version(subject, version) schema_id = schema_data.fetch('id') schema = Avro::Schema.parse(schema_data.fetch('schema')) [schema, schema_id] end |
#fetch_schema_by_body(schema_name:, subject: nil, namespace: nil) ⇒ Object
237 238 239 240 241 242 243 |
# File 'lib/avro_turf/messaging.rb', line 237 def fetch_schema_by_body(schema_name:, subject: nil, namespace: nil) schema = @schema_store.find(schema_name, namespace) schema_data = @registry.check(subject || schema.fullname, schema) raise SchemaNotFoundError.new("Schema with structure: #{schema} not found on registry") unless schema_data [schema, schema_data["id"]] end |
#fetch_schema_by_id(schema_id) ⇒ Object
Fetch the schema from registry with the provided schema_id.
229 230 231 232 233 234 235 |
# File 'lib/avro_turf/messaging.rb', line 229 def fetch_schema_by_id(schema_id) schema = @schemas_by_id.fetch(schema_id) do schema_json = @registry.fetch(schema_id) Avro::Schema.parse(schema_json) end [schema, schema_id] end |
#register_schema(schema_name:, subject: nil, namespace: nil) ⇒ Object
Schemas are registered under the full name of the top level Avro record type, or ‘subject` if it’s provided.
247 248 249 250 251 |
# File 'lib/avro_turf/messaging.rb', line 247 def register_schema(schema_name:, subject: nil, namespace: nil) schema = @schema_store.find(schema_name, namespace) schema_id = @registry.register(subject || schema.fullname, schema) [schema, schema_id] end |