Class: Fluent::Compat::SocketUtil::TcpHandler
- Inherits:
-
Coolio::Socket
- Object
- Coolio::Socket
- Fluent::Compat::SocketUtil::TcpHandler
- Defined in:
- lib/fluent/compat/socket_util.rb
Constant Summary collapse
- PEERADDR_FAILED =
["?", "?", "name resolution failed", "?"]
Instance Method Summary collapse
-
#initialize(io, log, delimiter, callback) ⇒ TcpHandler
constructor
A new instance of TcpHandler.
- #on_close ⇒ Object
- #on_connect ⇒ Object
- #on_read(data) ⇒ Object
Constructor Details
#initialize(io, log, delimiter, callback) ⇒ TcpHandler
Returns a new instance of TcpHandler.
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/fluent/compat/socket_util.rb', line 57 def initialize(io, log, delimiter, callback) super(io) @timeout = 0 if io.is_a?(TCPSocket) @addr = (io.peeraddr rescue PEERADDR_FAILED) opt = [1, @timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) end @delimiter = delimiter @callback = callback @log = log @log.trace { "accepted fluent socket object_id=#{self.object_id}" } @buffer = "".force_encoding('ASCII-8BIT') end |
Instance Method Details
#on_close ⇒ Object
91 92 93 |
# File 'lib/fluent/compat/socket_util.rb', line 91 def on_close @log.trace { "closed fluent socket object_id=#{self.object_id}" } end |
#on_connect ⇒ Object
73 74 |
# File 'lib/fluent/compat/socket_util.rb', line 73 def on_connect end |
#on_read(data) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/compat/socket_util.rb', line 76 def on_read(data) @buffer << data pos = 0 while i = @buffer.index(@delimiter, pos) msg = @buffer[pos...i] @callback.call(msg, @addr) pos = i + @delimiter.length end @buffer.slice!(0, pos) if pos > 0 rescue => e @log.error "unexpected error", error: e close end |