Class: Fluent::TCPSocketClientInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_tcp_socket_client.rb

Instance Method Summary collapse

Constructor Details

#initializeTCPSocketClientInput

Returns a new instance of TCPSocketClientInput.



18
19
20
21
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 18

def initialize
  super
  require 'socket'
end

Instance Method Details

#configure(conf) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 23

def configure(conf)
  super

  log.info "server has been set : #{@server}:#{@port}"

  case @format
  when 'json'
    require 'oj'
  when 'ltsv'
    require 'ltsv'
  when 'msgpack'
    require 'msgpack'
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


38
39
40
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 38

def multi_workers_ready?
  true
end

#parse_msg(record) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 72

def parse_msg(record)
  parsed_record = {}
  case @format
  when 'json'
    parsed_record = Oj.load(record)
  when 'ltsv'
    parsed_record = LTSV.parse(record)
  when 'msgpack'
    parsed_record = MessagePack.unpack(record)
  when 'text'
    parsed_record["message"] = record
  end
  parsed_record
end

#read_socket_messagesObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 47

def read_socket_messages
   es = MultiEventStream.new
   log.trace "Creating socket to #{@server}:#{@port}"
   begin
    socket = TCPSocket.open(@server, @port)
    count=0
    while message = socket.gets(@delimiter)
      es.add(Time.now.to_i, parse_msg(message.chomp(@delimiter)))
      count+=1
      if (count % @emit_messages) == 0
        unless es.empty?
          router.emit_stream(tag, es)
        end
        es = MultiEventStream.new
      end
    end
   rescue Exception => e
     $log.error e
   end

    unless es.empty?
       router.emit_stream(tag, es)
    end
end

#startObject



42
43
44
45
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 42

def start
  super
  timer_execute(:read_socket_run, @interval, repeat: true, &method(:read_socket_messages))
end