Class: Fluent::PluginHelper::Server::EventHandler::TLSServer

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TLSServer

It can’t use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.

Raises:

  • (ArgumentError)


672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
# File 'lib/fluent/plugin_helper/server.rb', line 672

def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)
  @_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
  @_handler_socket.sync_close = true
  @_handler_write_buffer = ''.force_encoding('ascii-8bit')
  @_handler_accepted = false
  super(@_handler_socket)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



669
670
671
# File 'lib/fluent/plugin_helper/server.rb', line 669

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



668
669
670
# File 'lib/fluent/plugin_helper/server.rb', line 668

def closing
  @closing
end

Instance Method Details

#closeObject



816
817
818
819
820
821
822
823
# File 'lib/fluent/plugin_helper/server.rb', line 816

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



700
701
702
703
704
705
706
707
708
709
710
# File 'lib/fluent/plugin_helper/server.rb', line 700

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



754
755
756
# File 'lib/fluent/plugin_helper/server.rb', line 754

def on_connect
  try_tls_accept
end

#on_read_with_connection(data) ⇒ Object



807
808
809
810
811
812
813
814
# File 'lib/fluent/plugin_helper/server.rb', line 807

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



798
799
800
801
802
803
804
805
# File 'lib/fluent/plugin_helper/server.rb', line 798

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_readableObject



758
759
760
761
762
763
764
765
766
767
# File 'lib/fluent/plugin_helper/server.rb', line 758

def on_readable
  if try_tls_accept
    super
  end
rescue IO::WaitReadable, IO::WaitWritable
  # ignore and return with doing nothing
rescue OpenSSL::SSL::SSLError => e
  @log.warn "close socket due to unexpected ssl error: #{e}"
  close rescue nil
end

#on_writableObject



769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
# File 'lib/fluent/plugin_helper/server.rb', line 769

def on_writable
  begin
    @mutex.synchronize do
      # Consider write_nonblock with {exception: false} when IO::WaitWritable error happens frequently.
      written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
      @_handler_write_buffer.slice!(0, written_bytes)
    end

    # No need to call `super` in a synchronized context because TLSServer doesn't use the inner buffer(::IO::Buffer) of Coolio::IO.
    # Instead of using Coolio::IO's inner buffer, TLSServer has own buffer(`@_handler_write_buffer`). See also TLSServer#write.
    # Actually, the only reason calling `super` here is call Coolio::IO#disable_write_watcher.
    # If `super` is called in a synchronized context, it could cause a mutex recursive locking since Coolio::IO#on_write_complete
    # eventually calls TLSServer#close which try to get a lock.
    super

    close if @close_after_write_complete
  rescue IO::WaitWritable, IO::WaitReadable
    return
  rescue Errno::EINTR
    return
  rescue SystemCallError, IOError, SocketError
    # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
    close rescue nil
    return
  rescue OpenSSL::SSL::SSLError => e
    @log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
  end
end

#to_ioObject



696
697
698
# File 'lib/fluent/plugin_helper/server.rb', line 696

def to_io
  @_handler_socket.to_io
end

#try_tls_acceptObject



720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
# File 'lib/fluent/plugin_helper/server.rb', line 720

def try_tls_accept
  return true if @_handler_accepted

  begin
    result = @_handler_socket.accept_nonblock(exception: false) # this method call actually try to do handshake via TLS
    if result == :wait_readable || result == :wait_writable
      # retry accept_nonblock: there aren't enough data in underlying socket buffer
    else
      @_handler_accepted = true

      @callback_connection = TLSCallbackSocket.new(self)
      @connect_callback.call(@callback_connection)
      unless @data_callback
        raise "connection callback must call #data to set data callback"
      end

      return true
    end
  rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    @log.trace "unexpected error before accepting TLS connection",
               addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  rescue OpenSSL::SSL::SSLError => e
    peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
    # Use same log level as on_readable
    @log.warn "unexpected error before accepting TLS connection by OpenSSL",
              addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
    close rescue nil
  end

  false
end

#write(data) ⇒ Object



712
713
714
715
716
717
718
# File 'lib/fluent/plugin_helper/server.rb', line 712

def write(data)
  @mutex.synchronize do
    @_handler_write_buffer << data
    schedule_write
    data.bytesize
  end
end