Class: LogStash::Codecs::Fluent

Inherits:
Base show all
Defined in:
lib/logstash/codecs/fluent.rb

Overview

This codec handles fluentd’s msgpack schema.

For example, you can receive logs from fluent-logger-ruby with:

input {
  tcp {
    codec => fluent
    port => 4000
  }
}

And from your ruby code in your own application:

logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000)
logger.post("some_tag", { "your" => "data", "here" => "yay!" })

Notes:

  • the fluent uses a second-precision time for events, so you will never see subsecond precision on events processed by this codec.

Constant Summary

Constants included from LogStash::Config::Mixin

LogStash::Config::Mixin::CONFIGSORT

Instance Attribute Summary

Attributes included from LogStash::Config::Mixin

#config, #original_params

Attributes inherited from Plugin

#logger, #params

Instance Method Summary collapse

Methods inherited from Base

#clone, #flush, #initialize, #on_event, #teardown

Methods included from LogStash::Config::Mixin

#config_init, included

Methods inherited from Plugin

#eql?, #finished, #finished?, #hash, #initialize, #inspect, lookup, #reload, #running?, #shutdown, #teardown, #terminating?, #to_s

Constructor Details

This class inherits a constructor from LogStash::Codecs::Base

Instance Method Details

#decode(data) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/logstash/codecs/fluent.rb', line 37

def decode(data)
  @decoder.feed(data)
  @decoder.each do |tag, epochtime, map|
    event = LogStash::Event.new(map.merge(
      "@timestamp" => Time.at(epochtime),
      "tags" => tag
    ))
    yield event
  end
end

#encode(event) ⇒ Object



49
50
51
52
53
# File 'lib/logstash/codecs/fluent.rb', line 49

def encode(event)
  tag = event["tags"] || "log"
  epochtime = event["@timestamp"].to_i
  @on_event.call(MessagePack.pack([ tag, epochtime, event.to_hash ]))
end

#registerObject



31
32
33
34
# File 'lib/logstash/codecs/fluent.rb', line 31

def register
  require "msgpack"
  @decoder = MessagePack::Unpacker.new
end