Class: Fluent::Plugin::AvroturfSerializer
- Inherits:
-
Formatter
- Object
- Formatter
- Fluent::Plugin::AvroturfSerializer
- Defined in:
- lib/fluent/plugin/formatter_avroturf_serializer.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin/formatter_avroturf_serializer.rb', line 42 def configure(conf) super if @schema_registry_url == nil || @schema_registry_api_key == nil || @schema_registry_api_secret ==nil raise Fluent::ConfigError, "schema_registry_url, schema_registry_api_key and schema_registry_api_secret are mandatory" end @avro = AvroTurf::Messaging.new( registry_url: @schema_registry_url, user: @schema_registry_api_key, password: @schema_registry_api_secret, schemas_path: @schemas_path, ) if @format_as_json_when_encode_failed @dump_proc = Oj.method(:dump) end if @schema_subject raise Fluent::ConfigError, "schema_version is required when fetching schema with schema_subject" if @schema_version == nil end end |
#format(tag, time, record) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/formatter_avroturf_serializer.rb', line 62 def format(tag, time, record) begin encoded_data = @avro.encode( record, subject: @schema_subject, version: @schema_version, schema_name: @schema_name, schema_id: @schema_id, validate: @validate, namespace: @namespace ) rescue => e if @format_as_json_when_encode_failed log.debug "Encode failed. Format events as JSON instead.", :error => e.to_s, :error_class => e.class.to_s, :tag => tag "#{@dump_proc.call(record)}#{@newline}" end end end |