Class: Fluent::Plugin::AvroturfSerializer

Inherits:
Formatter
  • Object
show all
Defined in:
lib/fluent/plugin/formatter_avroturf_serializer.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/formatter_avroturf_serializer.rb', line 42

def configure(conf)
  super
	if @schema_registry_url == nil || @schema_registry_api_key == nil || @schema_registry_api_secret ==nil
	  raise Fluent::ConfigError, "schema_registry_url, schema_registry_api_key and schema_registry_api_secret are mandatory"
	end
	@avro = AvroTurf::Messaging.new(
    registry_url: @schema_registry_url,
	  user: @schema_registry_api_key,
	  password: @schema_registry_api_secret,
	  schemas_path: @schemas_path,
	)
  if @format_as_json_when_encode_failed
    @dump_proc = Oj.method(:dump)
  end
  
  if @schema_subject 
	  raise Fluent::ConfigError, "schema_version is required when fetching schema with schema_subject" if @schema_version == nil
  end
end

#format(tag, time, record) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/formatter_avroturf_serializer.rb', line 62

def format(tag, time, record)
  begin
    encoded_data = @avro.encode(
      record, 
	    subject: @schema_subject, 
	    version: @schema_version, 
	    schema_name: @schema_name, 
	    schema_id: @schema_id, 
	    validate: @validate,
      namespace: @namespace
	  )
  rescue => e
    if @format_as_json_when_encode_failed
      log.debug "Encode failed. Format events as JSON instead.", :error => e.to_s, :error_class => e.class.to_s, :tag => tag
      "#{@dump_proc.call(record)}#{@newline}"
    end
  end
end