Class: Fluent::Compat::SocketUtil::TcpHandler

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/compat/socket_util.rb

Constant Summary collapse

PEERADDR_FAILED =
["?", "?", "name resolution failed", "?"]

Instance Method Summary collapse

Constructor Details

#initialize(io, log, delimiter, callback) ⇒ TcpHandler

Returns a new instance of TcpHandler.



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/compat/socket_util.rb', line 57

def initialize(io, log, delimiter, callback)
  super(io)
  @timeout = 0
  if io.is_a?(TCPSocket)
    @addr = (io.peeraddr rescue PEERADDR_FAILED)

    opt = [1, @timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  end
  @delimiter = delimiter
  @callback = callback
  @log = log
  @log.trace { "accepted fluent socket object_id=#{self.object_id}" }
  @buffer = "".force_encoding('ASCII-8BIT')
end

Instance Method Details

#on_closeObject



91
92
93
# File 'lib/fluent/compat/socket_util.rb', line 91

def on_close
  @log.trace { "closed fluent socket object_id=#{self.object_id}" }
end

#on_connectObject



73
74
# File 'lib/fluent/compat/socket_util.rb', line 73

def on_connect
end

#on_read(data) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/fluent/compat/socket_util.rb', line 76

def on_read(data)
  @buffer << data
  pos = 0

  while i = @buffer.index(@delimiter, pos)
    msg = @buffer[pos...i]
    @callback.call(msg, @addr)
    pos = i + @delimiter.length
  end
  @buffer.slice!(0, pos) if pos > 0
rescue => e
  @log.error "unexpected error", error: e
  close
end