Class: Fluent::Plugin::ECSMetadataFilter

Inherits:
Filter
  • Object
show all
Defined in:
lib/fluent/plugin/filter_ecs_metadata.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 21

def configure(conf)
  super

  require 'fluent_ecs'

  validate_params

  FluentECS.configure do |c|
    c.cache_size = @cache_size
    c.cache_ttl  = @cache_ttl < 0 ? :none : @cache_ttl
    c.fields     = @fields
  end

  FluentECS::Metadata.base_uri @ecs_agent_uri
  FluentECS::Task.base_uri @ecs_agent_uri

  @tag_regexp_compiled = Regexp.compile(@tag_regexp)
end

#filter_stream(tag, es) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 40

def filter_stream(tag, es)
  new_es   = Fluent::MultiEventStream.new
   = (tag)

  es.each do |time, record|
    if 
      record = merge_log_json(record) if merge_json_logs?
      if @fields_key.empty?
        record.merge! .to_h
      else
        record[@fields_key] = .to_h
      end
    end

    new_es.add(time, record)
  end

  new_es
end

#looks_like_json?(str) ⇒ Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 73

def looks_like_json?(str)
  str.is_a?(String) && str[0] == '{' && str[-1] == '}'
end

#merge_json_logs?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 77

def merge_json_logs?
  @merge_json_log
end

#merge_log_json(record) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 81

def merge_log_json(record)
  log = record['log']
  if looks_like_json?(log)
    begin
      record = JSON.parse(log).merge!(record)
      record.delete('log')
    rescue JSON::ParserError => e
      self.log.error(e)
    end
  end

  record
end

#metadata_for_tag(tag) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 65

def (tag)
  match = tag.match(@tag_regexp_compiled)
  FluentECS::Container.find(match['docker_id']) unless match.nil?
rescue FluentECS::IntrospectError => e
  log.error(e)
  nil
end

#validate_paramsObject

Raises:

  • (Fluent::ConfigError)


60
61
62
63
# File 'lib/fluent/plugin/filter_ecs_metadata.rb', line 60

def validate_params
  bad_field = @fields.find { |f| !FluentECS::Container.method_defined?(f) }
  raise Fluent::ConfigError, "Invalid field: '#{bad_field}'" if bad_field
end