Class: LogStash::Inputs::Tcp
- Defined in:
- lib/logstash/inputs/tcp.rb
Overview
Read events over a TCP socket.
Like stdin and file inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on ‘mode`.
Direct Known Subclasses
Defined Under Namespace
Classes: Interrupted
Constant Summary
Constants included from Config::Mixin
Instance Attribute Summary
Attributes inherited from Base
Attributes included from Config::Mixin
Attributes inherited from Plugin
Instance Method Summary collapse
-
#initialize(*args) ⇒ Tcp
constructor
A new instance of Tcp.
- #register ⇒ Object
- #run(output_queue) ⇒ Object
-
#run_client(output_queue) ⇒ Object
def run_server.
-
#run_server(output_queue) ⇒ Object
def run.
- #teardown ⇒ Object
Methods inherited from Base
Methods included from Config::Mixin
Methods inherited from Plugin
#eql?, #finished, #finished?, #hash, #inspect, lookup, #reload, #running?, #shutdown, #terminating?, #to_s
Constructor Details
#initialize(*args) ⇒ Tcp
Returns a new instance of Tcp.
57 58 59 |
# File 'lib/logstash/inputs/tcp.rb', line 57 def initialize(*args) super(*args) end |
Instance Method Details
#register ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/logstash/inputs/tcp.rb', line 62 def register require "socket" require "timeout" require "openssl" if @ssl_enable @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase) if @ssl_verify @cert_store = OpenSSL::X509::Store.new # Load the system default certificate path to the store @cert_store.set_default_paths if File.directory?(@ssl_cacert) @cert_store.add_path(@ssl_cacert) else @cert_store.add_file(@ssl_cacert) end @ssl_context.cert_store = @cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end end # @ssl_enable if server? @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") begin @server_socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) raise end if @ssl_enable @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) end # @ssl_enable end end |
#run(output_queue) ⇒ Object
149 150 151 152 153 154 155 |
# File 'lib/logstash/inputs/tcp.rb', line 149 def run(output_queue) if server? run_server(output_queue) else run_client(output_queue) end end |
#run_client(output_queue) ⇒ Object
def run_server
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/logstash/inputs/tcp.rb', line 200 def run_client(output_queue) @thread = Thread.current while true client_socket = TCPSocket.new(@host, @port) if @ssl_enable client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context) begin client_socket.connect rescue OpenSSL::SSL::SSLError => ssle @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) # NOTE(mrichar1): Hack to prevent hammering peer sleep(5) next end end client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Opened connection", :client => "#{client_socket.peer}") handle_socket(client_socket, client_socket.peer, output_queue, @codec.clone) end # loop ensure client_socket.close end |
#run_server(output_queue) ⇒ Object
def run
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/logstash/inputs/tcp.rb', line 157 def run_server(output_queue) @thread = Thread.current @client_threads = [] loop do # Start a new thread for each connection. begin @client_threads << Thread.start(@server_socket.accept) do |s| # TODO(sissel): put this block in its own method. # monkeypatch a 'peer' method onto the socket. s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") begin handle_socket(s, s.peer, output_queue, @codec.clone) rescue Interrupted s.close rescue nil end end # Thread.start rescue OpenSSL::SSL::SSLError => ssle # NOTE(mrichar1): This doesn't return a useful error message for some reason @logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace) rescue IOError, LogStash::ShutdownSignal if @interrupted # Intended shutdown, get out of the loop @server_socket.close @client_threads.each do |thread| thread.raise(LogStash::ShutdownSignal) end break else # Else it was a genuine IOError caused by something else, so propagate it up.. raise end end end # loop rescue LogStash::ShutdownSignal # nothing to do ensure @server_socket.close end |
#teardown ⇒ Object
225 226 227 228 229 230 |
# File 'lib/logstash/inputs/tcp.rb', line 225 def teardown if server? @interrupted = true @thread.raise(LogStash::ShutdownSignal) end end |