Class: Fluent::GelfInput
- Inherits:
-
Input
- Object
- Input
- Fluent::GelfInput
- Defined in:
- lib/fluent/plugin/in_gelf.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(time, record) ⇒ Object
-
#initialize ⇒ GelfInput
constructor
A new instance of GelfInput.
- #listen(callback) ⇒ Object
- #receive_data(data, addr) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ GelfInput
Returns a new instance of GelfInput.
13 14 15 16 17 |
# File 'lib/fluent/plugin/in_gelf.rb', line 13 def initialize super require 'fluent/plugin/socket_util' require 'gelfd2' end |
Instance Method Details
#configure(conf) ⇒ Object
42 43 44 45 46 47 |
# File 'lib/fluent/plugin/in_gelf.rb', line 42 def configure(conf) super @parser = Plugin.new_parser(@format) @parser.configure(conf) end |
#emit(time, record) ⇒ Object
112 113 114 115 116 |
# File 'lib/fluent/plugin/in_gelf.rb', line 112 def emit(time, record) router.emit(@tag, time, record) rescue => e log.error 'gelf failed to emit', error: e.to_s, error_class: e.class.to_s, tag: @tag, record: Yajl.dump(record) end |
#listen(callback) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/in_gelf.rb', line 101 def listen(callback) log.info "listening gelf socket on #{@bind}:#{@port} with #{@protocol_type}" if @protocol_type == :tcp Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback) else @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) SocketUtil::UdpHandler.new(@usock, log, 8192, callback) end end |
#receive_data(data, addr) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/in_gelf.rb', line 71 def receive_data(data, addr) begin msg = Gelfd2::Parser.parse(data) rescue => e log.warn 'Gelfd failed to parse a message', error: e.to_s log.warn_backtrace end # Gelfd parser will return nil if it received and parsed a non-final chunk return if msg.nil? @parser.parse(msg) { |time, record| unless time && record log.warn "pattern not match: #{msg.inspect}" return end # Use the recorded event time if available time = record.delete('timestamp').to_i if record.key?('timestamp') # Postprocess recorded event strip_leading_underscore_(record) if @strip_leading_underscore emit(time, record) } rescue => e log.error data.dump, error: e.to_s log.error_backtrace end |
#run ⇒ Object
64 65 66 67 68 69 |
# File 'lib/fluent/plugin/in_gelf.rb', line 64 def run @loop.run(@blocking_timeout) rescue log.error 'unexpected error', error: $!.to_s log.error_backtrace end |
#shutdown ⇒ Object
57 58 59 60 61 62 |
# File 'lib/fluent/plugin/in_gelf.rb', line 57 def shutdown @loop.watchers.each { |w| w.detach } @loop.stop @handler.close @thread.join end |
#start ⇒ Object
49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/in_gelf.rb', line 49 def start @loop = Coolio::Loop.new @handler = listen(method(:receive_data)) @loop.attach(@handler) @thread = Thread.new(&method(:run)) end |