Class: LogStash::Codecs::CEF

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/cef.rb

Overview

Implementation of a Logstash codec for the ArcSight Common Event Format (CEF) Based on Revision 20 of Implementing ArcSight CEF, dated from June 05, 2013 community.saas.hpe.com/dcvta86296/attachments/dcvta86296/connector-documentation/1116/1/CommonEventFormatv23.pdf

If this codec receives a payload from an input that is not a valid CEF message, then it will produce an event with the payload as the ‘message’ field and a ‘_cefparsefailure’ tag.

Constant Summary collapse

HEADER_FIELDS =
['cefVersion','deviceVendor','deviceProduct','deviceVersion','deviceEventClassId','name','severity']
MAPPINGS =

Translating and flattening the CEF extensions with known field names as documented in the Common Event Format whitepaper

{
    "act" => "deviceAction",
    "app" => "applicationProtocol",
    "c6a1" => "deviceCustomIPv6Address1",
    "c6a1Label" => "deviceCustomIPv6Address1Label",
    "c6a2" => "deviceCustomIPv6Address2",
    "c6a2Label" => "deviceCustomIPv6Address2Label",
    "c6a3" => "deviceCustomIPv6Address3",
    "c6a3Label" => "deviceCustomIPv6Address3Label",
    "c6a4" => "deviceCustomIPv6Address4",
    "c6a4Label" => "deviceCustomIPv6Address4Label",
    "cat" => "deviceEventCategory",
    "cfp1" => "deviceCustomFloatingPoint1",
    "cfp1Label" => "deviceCustomFloatingPoint1Label",
    "cfp2" => "deviceCustomFloatingPoint2",
    "cfp2Label" => "deviceCustomFloatingPoint2Label",
    "cfp3" => "deviceCustomFloatingPoint3",
    "cfp3Label" => "deviceCustomFloatingPoint3Label",
    "cfp4" => "deviceCustomFloatingPoint4",
    "cfp4Label" => "deviceCustomFloatingPoint4Label",
    "cn1" => "deviceCustomNumber1",
    "cn1Label" => "deviceCustomNumber1Label",
    "cn2" => "deviceCustomNumber2",
    "cn2Label" => "deviceCustomNumber2Label",
    "cn3" => "deviceCustomNumber3",
    "cn3Label" => "deviceCustomNumber3Label",
    "cnt" => "baseEventCount",
    "cs1" => "deviceCustomString1",
    "cs1Label" => "deviceCustomString1Label",
    "cs2" => "deviceCustomString2",
    "cs2Label" => "deviceCustomString2Label",
    "cs3" => "deviceCustomString3",
    "cs3Label" => "deviceCustomString3Label",
    "cs4" => "deviceCustomString4",
    "cs4Label" => "deviceCustomString4Label",
    "cs5" => "deviceCustomString5",
    "cs5Label" => "deviceCustomString5Label",
    "cs6" => "deviceCustomString6",
    "cs6Label" => "deviceCustomString6Label",
    "dhost" => "destinationHostName",
    "dmac" => "destinationMacAddress",
    "dntdom" => "destinationNtDomain",
    "dpid" => "destinationProcessId",
    "dpriv" => "destinationUserPrivileges",
    "dproc" => "destinationProcessName",
    "dpt" => "destinationPort",
    "dst" => "destinationAddress",
    "duid" => "destinationUserId",
    "duser" => "destinationUserName",
    "dvc" => "deviceAddress",
    "dvchost" => "deviceHostName",
    "dvcpid" => "deviceProcessId",
    "end" => "endTime",
    "fname" => "fileName",
    "fsize" => "fileSize",
    "in" => "bytesIn",
    "msg" => "message",
    "out" => "bytesOut",
    "outcome" => "eventOutcome",
    "proto" => "transportProtocol",
    "request" => "requestUrl",
    "rt" => "deviceReceiptTime",
    "shost" => "sourceHostName",
    "smac" => "sourceMacAddress",
    "sntdom" => "sourceNtDomain",
    "spid" => "sourceProcessId",
    "spriv" => "sourceUserPrivileges",
    "sproc" => "sourceProcessName",
    "spt" => "sourcePort",
    "src" => "sourceAddress",
    "start" => "startTime",
    "suid" => "sourceUserId",
    "suser" => "sourceUserName",
    "ahost" => "agentHost",
    "art" => "agentReceiptTime",
    "at" => "agentType",
    "aid" => "agentId",
    "_cefVer" => "cefVersion",
    "agt" => "agentAddress",
    "av" => "agentVersion",
    "atz" => "agentTimeZone",
    "dtz" => "destinationTimeZone",
    "slong" => "sourceLongitude",
    "slat" => "sourceLatitude",
    "dlong" => "destinationLongitude",
    "dlat" => "destinationLatitude",
    "catdt" => "categoryDeviceType",
    "mrt" => "managerReceiptTime",
    "amac" => "agentMacAddress"
}
REVERSE_MAPPINGS =

Reverse mapping of CEF full field names to CEF extensions field names for encoding into a CEF event for output.

MAPPINGS.invert
HEADER_PATTERN =

A CEF Header is a sequence of zero or more:

- backslash-escaped pipes; OR
- backslash-escaped backslashes; OR
- non-pipe characters
/(?:\\\||\\\\|[^|])*?/
HEADER_SCANNER =

Cache of a scanner pattern that captures a HEADER followed by an unescaped pipe

/(#{HEADER_PATTERN})#{Regexp.quote('|')}/
HEADER_ESCAPE_CAPTURE =

Cache of a gsub pattern that matches a backslash-escaped backslash or backslash-escaped pipe, capturing the escaped character

/\\([\\|])/
EXTENSION_VALUE_ESCAPE_CAPTURE =

Cache of a gsub pattern that matches a backslash-escaped backslash or backslash-escaped equals, capturing the escaped character

/\\([\\=])/
EXTENSION_KEY_PATTERN =

While the original CEF spec calls out that extension keys must be alphanumeric and must not contain spaces, in practice many “CEF” producers like the Arcsight smart connector produce non-legal keys including underscores, commas, periods, and square-bracketed index offsets.

To support this, we look for a specific sequence of characters that are followed by an equals sign. This pattern will correctly identify all strictly-legal keys, and will also match those that include a dot “subkey”

That sequence must begin with one or more ‘w` (word: alphanumeric + underscore), which optionally may be followed by “subkey” sequence consisting of a literal dot (`.`) followed by a non-whitespace character, then one or more word characters, and then one or more characters that do not convey semantic meaning within CEF (e.g., literal-pipe (`|`), whitespace (`s`), literal-dot (`.`), literal-equals (`=`), or literal-backslash (’')).

/(?:\w+(?:\.[^\s]\w+[^\|\s\.\=\\]+)?(?==))/
EXTENSION_KEY_ARRAY_CAPTURE =

Some CEF extension keys seen in the wild use an undocumented array-like syntax that may not be compatible with the Event API’s strict-mode FieldReference parser (e.g., ‘fieldname`). Cache of a `String#sub` pattern matching array-like syntax and capturing both the base field name and the array-indexing portion so we can convert to a valid FieldReference (e.g., `[fieldname]`).

/^([^\[\]]+)((?:\[[0-9]+\])+)$/
EXTENSION_VALUE_PATTERN =

In extensions, spaces may be included in an extension value without any escaping, so an extension value is a sequence of zero or more:

  • non-whitespace character; OR

  • runs of whitespace that are NOT followed by something that looks like a key-equals sequence

/(?:\S|\s++(?!#{EXTENSION_KEY_PATTERN}=))*/
EXTENSION_KEY_VALUE_SCANNER =

Cache of a scanner pattern that captures extension field key/value pairs

/(#{EXTENSION_KEY_PATTERN})=(#{EXTENSION_VALUE_PATTERN})\s*/

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ CEF

Returns a new instance of CEF.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/logstash/codecs/cef.rb', line 216

def initialize(params={})
  super(params)

  # CEF input MUST be UTF-8, per the CEF White Paper that serves as the format's specification:
  # https://web.archive.org/web/20160422182529/https://kc.mcafee.com/resources/sites/MCAFEE/content/live/CORP_KNOWLEDGEBASE/78000/KB78712/en_US/CEF_White_Paper_20100722.pdf
  @utf8_charset = LogStash::Util::Charset.new('UTF-8')
  @utf8_charset.logger = self.logger

  if @delimiter
    # Logstash configuration doesn't have built-in support for escaping,
    # so we implement it here. Feature discussion for escaping is here:
    #   https://github.com/elastic/logstash/issues/1645
    @delimiter = @delimiter.gsub("\\r", "\r").gsub("\\n", "\n")
    @buffer = FileWatch::BufferedTokenizer.new(@delimiter)
  end
end

Instance Method Details

#decode(data, &block) ⇒ Object



234
235
236
237
238
239
240
241
242
# File 'lib/logstash/codecs/cef.rb', line 234

def decode(data, &block)
  if @delimiter
    @buffer.extract(data).each do |line|
      handle(line, &block)
    end
  else
    handle(data, &block)
  end
end

#encode(event) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/logstash/codecs/cef.rb', line 313

def encode(event)
  # "CEF:0|Elasticsearch|Logstash|1.0|Signature|Name|Sev|"

  vendor = sanitize_header_field(event.sprintf(@vendor))
  vendor = self.class.get_config["vendor"][:default] if vendor == ""

  product = sanitize_header_field(event.sprintf(@product))
  product = self.class.get_config["product"][:default] if product == ""

  version = sanitize_header_field(event.sprintf(@version))
  version = self.class.get_config["version"][:default] if version == ""

  signature = sanitize_header_field(event.sprintf(@signature))
  signature = self.class.get_config["signature"][:default] if signature == ""

  name = sanitize_header_field(event.sprintf(@name))
  name = self.class.get_config["name"][:default] if name == ""

  severity = sanitize_severity(event, @severity)

  # Should also probably set the fields sent
  header = ["CEF:0", vendor, product, version, signature, name, severity].join("|")
  values = @fields.map {|fieldname| get_value(fieldname, event)}.compact.join(" ")

  @on_event.call(event, "#{header}|#{values}#{@delimiter}")
end

#handle(data, &block) ⇒ Object



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/logstash/codecs/cef.rb', line 244

def handle(data, &block)
  event = LogStash::Event.new
  event.set(raw_data_field, data) unless raw_data_field.nil?

  @utf8_charset.convert(data)

  # Several of the many operations in the rest of this method will fail when they encounter UTF8-tagged strings
  # that contain invalid byte sequences; fail early to avoid wasted work.
  fail('invalid byte sequence in UTF-8') unless data.valid_encoding?

  # Strip any quotations at the start and end, flex connectors seem to send this
  if data[0] == "\""
    data = data[1..-2]
  end

  # Use a scanning parser to capture the HEADER_FIELDS
  unprocessed_data = data
  HEADER_FIELDS.each do |field_name|
    match_data = HEADER_SCANNER.match(unprocessed_data)
    break if match_data.nil? # missing fields

    escaped_field_value = match_data[1]
    next if escaped_field_value.nil?

    # process legal header escape sequences
    unescaped_field_value = escaped_field_value.gsub(HEADER_ESCAPE_CAPTURE, '\1')

    event.set(field_name, unescaped_field_value)
    unprocessed_data = match_data.post_match
  end

  #Remainder is message
  message = unprocessed_data

  # Try and parse out the syslog header if there is one
  if event.get('cefVersion').include? ' '
    split_cef_version= event.get('cefVersion').rpartition(' ')
    event.set('syslog', split_cef_version[0])
    event.set('cefVersion',split_cef_version[2])
  end

  # Get rid of the CEF bit in the version
  event.set('cefVersion', event.get('cefVersion').sub(/^CEF:/, ''))

  # Use a scanning parser to capture the Extension Key/Value Pairs
  if message && message.include?('=')
    message = message.strip

    message.scan(EXTENSION_KEY_VALUE_SCANNER) do |extension_field_key, raw_extension_field_value|
      # expand abbreviated extension field keys
      extension_field_key = MAPPINGS.fetch(extension_field_key, extension_field_key)

      # convert extension field name to strict legal field_reference, fixing field names with ambiguous array-like syntax
      extension_field_key = extension_field_key.sub(EXTENSION_KEY_ARRAY_CAPTURE, '[\1]\2') if extension_field_key.end_with?(']')

      # process legal extension field value escapes
      extension_field_value = raw_extension_field_value.gsub(EXTENSION_VALUE_ESCAPE_CAPTURE, '\1')

      event.set(extension_field_key, extension_field_value)
    end
  end

  yield event
rescue => e
  @logger.error("Failed to decode CEF payload. Generating failure event with payload in message field.", :error => e.message, :backtrace => e.backtrace, :data => data)
  yield LogStash::Event.new("message" => data, "tags" => ["_cefparsefailure"])
end