Class: Fluent::Plugin::ForwardOutput::ConnectionManager
- Inherits:
-
Object
- Object
- Fluent::Plugin::ForwardOutput::ConnectionManager
- Defined in:
- lib/fluent/plugin/out_forward/connection_manager.rb
Defined Under Namespace
Classes: RequestInfo
Instance Method Summary collapse
- #close(sock) ⇒ Object
- #connect(host:, port:, hostname:, ack: nil, &block) ⇒ Object
-
#initialize(log:, secure:, connection_factory:, socket_cache:) ⇒ ConnectionManager
constructor
A new instance of ConnectionManager.
- #purge_obsolete_socks ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(log:, secure:, connection_factory:, socket_cache:) ⇒ ConnectionManager
Returns a new instance of ConnectionManager.
28 29 30 31 32 33 |
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 28 def initialize(log:, secure:, connection_factory:, socket_cache:) @log = log @secure = secure @connection_factory = connection_factory @socket_cache = socket_cache end |
Instance Method Details
#close(sock) ⇒ Object
72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 72 def close(sock) if @socket_cache @socket_cache.checkin(sock) else sock.close_write rescue nil sock.close rescue nil end end |
#connect(host:, port:, hostname:, ack: nil, &block) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 40 def connect(host:, port:, hostname:, ack: nil, &block) if @socket_cache return connect_keepalive(host: host, port: port, hostname: hostname, ack: ack, &block) end @log.debug('connect new socket') socket = @connection_factory.call(host, port, hostname) request_info = RequestInfo.new(@secure ? :helo : :established) unless block_given? return [socket, request_info] end begin yield(socket, request_info) ensure if ack ack.enqueue(socket) else socket.close_write rescue nil socket.close rescue nil end end end |
#purge_obsolete_socks ⇒ Object
65 66 67 68 69 70 |
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 65 def purge_obsolete_socks unless @socket_cache raise "Do not call this method without keepalive option" end @socket_cache.purge_obsolete_socks end |
#stop ⇒ Object
35 36 37 |
# File 'lib/fluent/plugin/out_forward/connection_manager.rb', line 35 def stop @socket_cache && @socket_cache.clear end |