Class: Fluent::PluginHelper::Server::EventHandler::TLSServer

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TLSServer

It can’t use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.

Raises:

  • (ArgumentError)


664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
# File 'lib/fluent/plugin_helper/server.rb', line 664

def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)
  @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
  @_handler_socket.sync_close = true
  @_handler_write_buffer = ''.force_encoding('ascii-8bit')
  @_handler_accepted = false
  super(@_handler_socket)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



661
662
663
# File 'lib/fluent/plugin_helper/server.rb', line 661

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



660
661
662
# File 'lib/fluent/plugin_helper/server.rb', line 660

def closing
  @closing
end

Instance Method Details

#closeObject



808
809
810
811
812
813
814
815
# File 'lib/fluent/plugin_helper/server.rb', line 808

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



692
693
694
695
696
697
698
699
700
701
702
# File 'lib/fluent/plugin_helper/server.rb', line 692

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



746
747
748
# File 'lib/fluent/plugin_helper/server.rb', line 746

def on_connect
  try_tls_accept
end

#on_read_with_connection(data) ⇒ Object



799
800
801
802
803
804
805
806
# File 'lib/fluent/plugin_helper/server.rb', line 799

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



790
791
792
793
794
795
796
797
# File 'lib/fluent/plugin_helper/server.rb', line 790

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_readableObject



750
751
752
753
754
755
756
757
758
759
# File 'lib/fluent/plugin_helper/server.rb', line 750

def on_readable
  if try_tls_accept
    super
  end
rescue IO::WaitReadable, IO::WaitWritable
  # ignore and return with doing nothing
rescue OpenSSL::SSL::SSLError => e
  @log.warn "close socket due to unexpected ssl error: #{e}"
  close rescue nil
end

#on_writableObject



761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
# File 'lib/fluent/plugin_helper/server.rb', line 761

def on_writable
  begin
    @mutex.synchronize do
      # Consider write_nonblock with {exception: false} when IO::WaitWritable error happens frequently.
      written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
      @_handler_write_buffer.slice!(0, written_bytes)
    end

    # No need to call `super` in a synchronized context because TLSServer doesn't use the inner buffer(::IO::Buffer) of Coolio::IO.
    # Instead of using Coolio::IO's inner buffer, TLSServer has own buffer(`@_handler_write_buffer`). See also TLSServer#write.
    # Actually, the only reason calling `super` here is call Coolio::IO#disable_write_watcher.
    # If `super` is called in a synchronized context, it could cause a mutex recursive locking since Coolio::IO#on_write_complete
    # eventually calls TLSServer#close which try to get a lock.
    super

    close if @close_after_write_complete
  rescue IO::WaitWritable, IO::WaitReadable
    return
  rescue Errno::EINTR
    return
  rescue SystemCallError, IOError, SocketError
    # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
    close rescue nil
    return
  rescue OpenSSL::SSL::SSLError => e
    @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
  end
end

#to_ioObject



688
689
690
# File 'lib/fluent/plugin_helper/server.rb', line 688

def to_io
  @_handler_socket.to_io
end

#try_tls_acceptObject



712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
# File 'lib/fluent/plugin_helper/server.rb', line 712

def try_tls_accept
  return true if @_handler_accepted

  begin
    result = @_handler_socket.accept_nonblock(exception: false) # this method call actually try to do handshake via TLS
    if result == :wait_readable || result == :wait_writable
      # retry accept_nonblock: there aren't enough data in underlying socket buffer
    else
      @_handler_accepted = true

      @callback_connection = TLSCallbackSocket.new(self)
      @connect_callback.call(@callback_connection)
      unless @data_callback
        raise "connection callback must call #data to set data callback"
      end

      return true
    end
  rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    @log.trace "unexpected error before accepting TLS connection",
               addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  rescue OpenSSL::SSL::SSLError => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    # Use same log level as on_readable
    @log.warn "unexpected error before accepting TLS connection by OpenSSL",
              addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  end

  false
end

#write(data) ⇒ Object



704
705
706
707
708
709
710
# File 'lib/fluent/plugin_helper/server.rb', line 704

def write(data)
  @mutex.synchronize do
    @_handler_write_buffer << data
    schedule_write
    data.bytesize
  end
end