Class: Fluent::PluginHelper::Server::EventHandler::TCPServer

Inherits:
Coolio::TCPSocket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TCPServer

Returns a new instance of TCPServer.

Raises:

  • (ArgumentError)


582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# File 'lib/fluent/plugin_helper/server.rb', line 582

def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)

  @_handler_socket = sock
  super(sock)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



580
581
582
# File 'lib/fluent/plugin_helper/server.rb', line 580

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



579
580
581
# File 'lib/fluent/plugin_helper/server.rb', line 579

def closing
  @closing
end

Instance Method Details

#closeObject



657
658
659
660
661
662
663
664
# File 'lib/fluent/plugin_helper/server.rb', line 657

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



608
609
610
611
612
613
614
615
616
617
618
# File 'lib/fluent/plugin_helper/server.rb', line 608

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



631
632
633
634
635
636
637
# File 'lib/fluent/plugin_helper/server.rb', line 631

def on_connect
  @callback_connection = TCPCallbackSocket.new(self)
  @connect_callback.call(@callback_connection)
  unless @data_callback
    raise "connection callback must call #data to set data callback"
  end
end

#on_read_with_connection(data) ⇒ Object



648
649
650
651
652
653
654
655
# File 'lib/fluent/plugin_helper/server.rb', line 648

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



639
640
641
642
643
644
645
646
# File 'lib/fluent/plugin_helper/server.rb', line 639

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_writableObject



626
627
628
629
# File 'lib/fluent/plugin_helper/server.rb', line 626

def on_writable
  super
  close if @close_after_write_complete
end

#to_ioObject



604
605
606
# File 'lib/fluent/plugin_helper/server.rb', line 604

def to_io
  @_handler_socket
end

#write(data) ⇒ Object



620
621
622
623
624
# File 'lib/fluent/plugin_helper/server.rb', line 620

def write(data)
  @mutex.synchronize do
    super
  end
end