Class: Fluent::PluginHelper::Server::EventHandler::TCPServer

Inherits:
Coolio::TCPSocket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TCPServer

Returns a new instance of TCPServer.

Raises:

  • (ArgumentError)


574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
# File 'lib/fluent/plugin_helper/server.rb', line 574

def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)

  @_handler_socket = sock
  super(sock)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



572
573
574
# File 'lib/fluent/plugin_helper/server.rb', line 572

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



571
572
573
# File 'lib/fluent/plugin_helper/server.rb', line 571

def closing
  @closing
end

Instance Method Details

#closeObject



649
650
651
652
653
654
655
656
# File 'lib/fluent/plugin_helper/server.rb', line 649

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



600
601
602
603
604
605
606
607
608
609
610
# File 'lib/fluent/plugin_helper/server.rb', line 600

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



623
624
625
626
627
628
629
# File 'lib/fluent/plugin_helper/server.rb', line 623

def on_connect
  @callback_connection = TCPCallbackSocket.new(self)
  @connect_callback.call(@callback_connection)
  unless @data_callback
    raise "connection callback must call #data to set data callback"
  end
end

#on_read_with_connection(data) ⇒ Object



640
641
642
643
644
645
646
647
# File 'lib/fluent/plugin_helper/server.rb', line 640

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



631
632
633
634
635
636
637
638
# File 'lib/fluent/plugin_helper/server.rb', line 631

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_writableObject



618
619
620
621
# File 'lib/fluent/plugin_helper/server.rb', line 618

def on_writable
  super
  close if @close_after_write_complete
end

#to_ioObject



596
597
598
# File 'lib/fluent/plugin_helper/server.rb', line 596

def to_io
  @_handler_socket
end

#write(data) ⇒ Object



612
613
614
615
616
# File 'lib/fluent/plugin_helper/server.rb', line 612

def write(data)
  @mutex.synchronize do
    super
  end
end