Class: LogStash::Filters::KubernetesMetadata

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/filters/kubernetes_metadata.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#lookup_cacheObject

Returns the value of attribute lookup_cache.



11
12
13
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 11

def lookup_cache
  @lookup_cache
end

Instance Method Details

#filter(event) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 41

def filter(event)
  @logger.debug("event is: #{event}")
  path = event.get(@source)

  # Ensure that the path parameter has been defined so that we can find the required metadata
  if (path.nil? || path.empty?)
    event.tag("_kubeparsefailure")
    return
  end

  @logger.debug("Log entry has source field, beginning processing for Kubernetes")

   = {}
   = lookup_cache[path]
   = get_file_info(path)

  # If we were unable to extract metadata from the file name, return
  return unless 

  if 
     = .merge()
  else
    @logger.debug("Trying to get kubernetes file info, it was not cached");
    @logger.debug("kubernetes file info got: #{}")

    pod = ['pod']
    namespace = ['namespace']
    name = ['container_name']

    return unless pod and namespace and name

    if data = get_kubernetes(namespace, pod)
       = .merge(data)
      set_log_formats()
      lookup_cache[path] = 
    end
    @logger.debug("metadata within lookup_cache[path]: #{}")
  end


  @logger.debug("metadata after unless lookup_cache[path] is: #{}")
  @logger.debug("config after unless lookup_cache[path] is: #{config}")

  event.set(@target, )
  return filter_matched(event)
end

#get_file_info(path) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 120

def get_file_info(path)
  parts = path.split(File::SEPARATOR).last.gsub(/.log$/, '').split('_')
  if parts.length != 3 || parts[2].start_with?('POD-')
    return nil
  end
  kubernetes = {}
  kubernetes['replication_controller'] = parts[0].gsub(/-[0-9a-z]*$/, '')
  kubernetes['pod'] = parts[0]
  kubernetes['namespace'] = parts[1]
  kubernetes['container_name'] = parts[2].gsub(/-[0-9a-z]*$/, '')
  kubernetes['container_id'] = parts[2].split('-').last
  return kubernetes
end

#get_kubernetes(namespace, pod) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 147

def get_kubernetes(namespace, pod)

  begin
    @logger.debug("Attempting to query the Kubernetes API")

    url = [ @api, 'api/v1/namespaces', namespace, 'pods', pod ].join("/")

    rest_opts = {
      verify_ssl: false
    }

    if @auth
      if @auth['basic']
        @logger.debug("Found basic auth for Kubernetes API")

        basic_user = @auth['basic']['user']
        basic_pass = @auth['basic']['pass']

        rest_opts.merge!( user: basic_user, password: basic_pass )
      end

      if @auth['bearer']
        @logger.debug("Found Bearer  auth for Kubernetes API")

        bearer_key = @auth['bearer']['key']

        rest_opts.merge!( Authorization: "Bearer #{bearer_key}" )
      end
    end

    @logger.debug("rest_opts: #{rest_opts}")

    begin
      response = RestClient::Resource.new(url, rest_opts).get
    rescue RestClient::ResourceNotFound
      @logger.warn("Kubernetes returned an error while querying the API")
      @logger.warn("url: #{url}, rest_opts: #{rest_opts}")
    rescue Exception => e
      @logger.warn("Error while querying the API: #{e.to_s}")
    end

    if response && response.code != 200
      @logger.warn("Non 200 response code returned: #{response.code}")
    end

    @logger.debug("response was: #{response}")

    data = LogStash::Json.load(response.body)

    {
      'annotations' => sanatize_keys(data['metadata']['annotations']),
      'labels' => sanatize_keys(data['metadata']['labels'])
    }
  rescue => e
    @logger.warn("Unknown error while getting Kubernetes metadata: #{e}")
  end
end

#registerObject



31
32
33
34
35
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 31

def register
  @logger.debug("Registering Kubernetes Filter plugin")
  self.lookup_cache ||= LruRedux::ThreadSafeCache.new(1000, 900)
  @logger.debug("Created cache...")
end

#sanatize_keys(data) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 134

def sanatize_keys(data)
  return {} unless data

  parsed_data = {}
  data.each do |k,v|
    new_key = k.gsub(/\.|,/, '_')
      .gsub(/\//, '-')
    parsed_data[new_key] = v
  end

  return parsed_data
end

#set_log_formats(metadata) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/logstash/filters/kubernetes_metadata.rb', line 88

def set_log_formats()
  begin
    #return if metadata['annotations'].empty?

    format = {
      'stderr' => @default_log_format,
      'stdout' => @default_log_format
    }
    a = ['annotations']
    n = ['container_name']

    # check for log-format-<stream>-<name>, log-format-<name>, log-format-<stream>, log-format
    # in annotations
    %w{ stderr stdout }.each do |t|
      [ "log-format-#{t}-#{n}", "log-format-#{n}", "log-format-#{t}", "log-format" ].each do |k|
        if v = a[k]
          format[t] = v
          break
        end
      end
    end

    ['log_format_stderr'] = format['stderr']
    ['log_format_stdout'] = format['stdout']
    @logger.debug("kubernetes metadata => #{}")

  rescue => e
    @logger.warn("Error setting log format: #{e}")
  end
end