Class: Fluent::GelfInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_gelf.rb

Instance Method Summary collapse

Constructor Details

#initializeGelfInput

Returns a new instance of GelfInput.



13
14
15
16
17
# File 'lib/fluent/plugin/in_gelf.rb', line 13

def initialize
  super
  require 'fluent/plugin/socket_util'
  require 'gelfd2'
end

Instance Method Details

#configure(conf) ⇒ Object



42
43
44
45
46
47
# File 'lib/fluent/plugin/in_gelf.rb', line 42

def configure(conf)
  super

  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
end

#emit(time, record) ⇒ Object



112
113
114
115
116
# File 'lib/fluent/plugin/in_gelf.rb', line 112

def emit(time, record)
  router.emit(@tag, time, record)
rescue => e
  log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record)
end

#listen(callback) ⇒ Object



101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_gelf.rb', line 101

def listen(callback)
  log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}"
  if @protocol_type == :tcp
    Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback)
  else
    @usock = SocketUtil.create_udp_socket(@bind)
    @usock.bind(@bind, @port)
    SocketUtil::UdpHandler.new(@usock, log, 8192, callback)
  end
end

#receive_data(data, addr) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/in_gelf.rb', line 71

def receive_data(data, addr)
  begin
    msg = Gelfd2::Parser.parse(data)
  rescue => e
    log.warn 'Gelfd failed to parse a message', error: e.to_s
    log.warn_backtrace
  end

  # Gelfd parser will return nil if it received and parsed a non-final chunk
  return if msg.nil?

  @parser.parse(msg) { |time, record|
    unless time && record
      log.warn "pattern not match: #{msg.inspect}"
      return
    end

    # Use the recorded event time if available
    time = record.delete('timestamp').to_i if record.key?('timestamp')

    # Postprocess recorded event
    strip_leading_underscore_(record) if @strip_leading_underscore

    emit(time, record)
  }
rescue => e
  log.error data.dump, error: e.to_s
  log.error_backtrace
end

#runObject



64
65
66
67
68
69
# File 'lib/fluent/plugin/in_gelf.rb', line 64

def run
  @loop.run(@blocking_timeout)
rescue
  log.error 'unexpected error', error: $!.to_s
  log.error_backtrace
end

#shutdownObject



57
58
59
60
61
62
# File 'lib/fluent/plugin/in_gelf.rb', line 57

def shutdown
  @loop.watchers.each { |w| w.detach }
  @loop.stop
  @handler.close
  @thread.join
end

#startObject



49
50
51
52
53
54
55
# File 'lib/fluent/plugin/in_gelf.rb', line 49

def start
  @loop = Coolio::Loop.new
  @handler = listen(method(:receive_data))
  @loop.attach(@handler)

  @thread = Thread.new(&method(:run))
end