Class: LogStash::Outputs::Tcp

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/tcp.rb

Overview

Write events over a TCP socket.

Each event json is separated by a newline.

Can either accept connections from clients or connect to a server, depending on ‘mode`.

Defined Under Namespace

Classes: Client

Instance Method Summary collapse

Instance Method Details

#BaseObject



296
297
298
299
300
301
302
303
304
305
# File 'lib/logstash/outputs/tcp.rb', line 296

def close
  @closed.make_true
  @server_socket.close rescue nil if @server_socket

  return unless @client_threads
  @client_threads.each do |thread|
    client = thread[:client]
    client.close rescue nil if client
  end
end

#log_error(msg, e, backtrace: @logger.info?, **details) ⇒ Object



313
314
315
316
317
# File 'lib/logstash/outputs/tcp.rb', line 313

def log_error(msg, e, backtrace: @logger.info?, **details)
  details = details.merge message: e.message, exception: e.class
  details[:backtrace] = e.backtrace if backtrace
  @logger.error(msg, details)
end

#log_warn(msg, e, backtrace: @logger.debug?, **details) ⇒ Object



307
308
309
310
311
# File 'lib/logstash/outputs/tcp.rb', line 307

def log_warn(msg, e, backtrace: @logger.debug?, **details)
  details = details.merge message: e.message, exception: e.class
  details[:backtrace] = e.backtrace if backtrace
  @logger.warn(msg, details)
end

#BaseObject



291
292
293
# File 'lib/logstash/outputs/tcp.rb', line 291

def receive(event)
  @codec.encode(event)
end

#BaseObject



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/logstash/outputs/tcp.rb', line 183

def register
  require "socket"
  require "stud/try"

  validate_ssl_config!

  @closed = Concurrent::AtomicBoolean.new(false)
  @thread_no = Concurrent::AtomicFixnum.new(0)
  setup_ssl if @ssl_enabled

  if server?
    run_as_server
  else
    run_as_client
  end
end

#run_as_clientObject



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/logstash/outputs/tcp.rb', line 263

def run_as_client
  client_socket = nil
  @codec.on_event do |event, payload|
    begin
      client_socket = connect unless client_socket
      while payload && payload.bytesize > 0
        begin
          written_bytes_size = client_socket.write_nonblock(payload)
          payload = payload.byteslice(written_bytes_size..-1)
        rescue IO::WaitReadable
          IO.select([client_socket])
          retry
        rescue IO::WaitWritable
          IO.select(nil, [client_socket])
          retry
        end
      end
    rescue => e
      log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil)
      client_socket.close rescue nil
      client_socket = nil
      sleep @reconnect_interval
      retry
    end
  end
end

#run_as_serverObject



200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/logstash/outputs/tcp.rb', line 200

def run_as_server
  @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
  begin
    @server_socket = TCPServer.new(@host, @port)
  rescue Errno::EADDRINUSE
    @logger.error("Could not start tcp server: Address in use", host: @host, port: @port)
    raise
  end
  if @ssl_enabled
    @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
  end # @ssl_enabled
  @client_threads = Concurrent::Array.new

  @accept_thread = Thread.new(@server_socket) do |server_socket|
    LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
    loop do
      break if @closed.value
      # OpenSSL::SSL::SSLServer does not support the #accept_nonblock method.
      # When SSL is enabled, it needs to use the blocking counterpart and ignore
      # SSLError errors, as they may be client's issues such as missing client's
      # certificates, ciphers, etc. If it's not rescued here, it would close the
      # TCP server and exit the plugin.
      # On the other hand, IOError should normally happen when the pipeline configuration
      # is reloaded, as the stream gets closed in the thread
      if @ssl_enabled
        begin
          client_socket = server_socket.accept
        rescue OpenSSL::SSL::SSLError => e
          log_warn("SSL Error", e)
          retry unless @closed.value
        rescue IOError => e
          log_warn("IO Error", e)
          retry unless @closed.value
        end
      else
        client_socket = server_socket.accept_nonblock exception: false
        if client_socket == :wait_readable
          IO.select [ server_socket ]
          next
        end
      end

      Thread.start(client_socket) do |client_socket|
        # monkeypatch a 'peer' method onto the socket.
        client_socket.extend(::LogStash::Util::SocketPeer)
        @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}")
        client = Client.new(client_socket, self)
        Thread.current[:client] = client
        LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}")
        @client_threads << Thread.current
        client.run unless @closed.value
      end
    end
  end

  @codec.on_event do |event, payload|
    @client_threads.select!(&:alive?)
    @client_threads.each do |client_thread|
      client_thread[:client].write(payload)
    end
  end
end