Class: Fluent::Plugin::ForwardOutput::ConnectionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward/connection_manager.rb

Defined Under Namespace

Classes: RequestInfo

Instance Method Summary collapse

Constructor Details

#initialize(log:, secure:, connection_factory:, socket_cache:) ⇒ ConnectionManager

Returns a new instance of ConnectionManager.

Parameters:

  • log (Logger)
  • secure (Boolean)
  • connection_factory (Proc)
  • SocketCache (Fluent::ForwardOutput::SocketCache)


28
29
30
31
32
33
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 28

def initialize(log:, secure:, connection_factory:, socket_cache:)
  @log = log
  @secure = secure
  @connection_factory = connection_factory
  @socket_cache = socket_cache
end

Instance Method Details

#close(sock) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 72

def close(sock)
  if @socket_cache
    @socket_cache.checkin(sock)
  else
    sock.close_write rescue nil
    sock.close rescue nil
  end
end

#connect(host:, port:, hostname:, ack: nil, &block) ⇒ Object

Parameters:

  • ack (Fluent::Plugin::ForwardOutput::AckHander::Ack|nil) (defaults to: nil)


40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 40

def connect(host:, port:, hostname:, ack: nil, &block)
  if @socket_cache
    return connect_keepalive(host: host, port: port, hostname: hostname, ack: ack, &block)
  end

  @log.debug('connect new socket')
  socket = @connection_factory.call(host, port, hostname)
  request_info = RequestInfo.new(@secure ? :helo : :established)

  unless block_given?
    return [socket, request_info]
  end

  begin
    yield(socket, request_info)
  ensure
    if ack
      ack.enqueue(socket)
    else
      socket.close_write rescue nil
      socket.close rescue nil
    end
  end
end

#purge_obsolete_socksObject



65
66
67
68
69
70
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 65

def purge_obsolete_socks
  unless @socket_cache
    raise "Do not call this method without keepalive option"
  end
  @socket_cache.purge_obsolete_socks
end

#stopObject



35
36
37
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 35

def stop
  @socket_cache && @socket_cache.clear
end