Class: Fluent::TCPSocketClientInput
- Inherits:
-
Input
- Object
- Input
- Fluent::TCPSocketClientInput
- Defined in:
- lib/fluent/plugin/in_tcp_socket_client.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ TCPSocketClientInput
constructor
A new instance of TCPSocketClientInput.
- #multi_workers_ready? ⇒ Boolean
- #parse_msg(record) ⇒ Object
- #read_socket_messages ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ TCPSocketClientInput
Returns a new instance of TCPSocketClientInput.
18 19 20 21 |
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 18 def initialize super require 'socket' end |
Instance Method Details
#configure(conf) ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 23 def configure(conf) super log.info "server has been set : #{@server}:#{@port}" case @format when 'json' require 'oj' when 'ltsv' require 'ltsv' when 'msgpack' require 'msgpack' end end |
#multi_workers_ready? ⇒ Boolean
38 39 40 |
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 38 def multi_workers_ready? true end |
#parse_msg(record) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 72 def parse_msg(record) parsed_record = {} case @format when 'json' parsed_record = Oj.load(record) when 'ltsv' parsed_record = LTSV.parse(record) when 'msgpack' parsed_record = MessagePack.unpack(record) when 'text' parsed_record["message"] = record end parsed_record end |
#read_socket_messages ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 47 def es = MultiEventStream.new log.trace "Creating socket to #{@server}:#{@port}" begin socket = TCPSocket.open(@server, @port) count=0 while = socket.gets(@delimiter) es.add(Time.now.to_i, parse_msg(.chomp(@delimiter))) count+=1 if (count % @emit_messages) == 0 unless es.empty? router.emit_stream(tag, es) end es = MultiEventStream.new end end rescue Exception => e $log.error e end unless es.empty? router.emit_stream(tag, es) end end |
#start ⇒ Object
42 43 44 45 |
# File 'lib/fluent/plugin/in_tcp_socket_client.rb', line 42 def start super timer_execute(:read_socket_run, @interval, repeat: true, &method(:read_socket_messages)) end |