Class: Fluent::PluginHelper::Server::EventHandler::TCPServer

Inherits:
Coolio::TCPSocket
  • Object
show all
Defined in:
lib/fluent/plugin_helper/server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback) ⇒ TCPServer

Returns a new instance of TCPServer.

Raises:

  • (ArgumentError)


555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'lib/fluent/plugin_helper/server.rb', line 555

def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
  raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)

  socket_option_setter.call(sock)

  @_handler_socket = sock
  super(sock)

  @log = log
  @under_plugin_development = under_plugin_development

  @connect_callback = connect_callback
  @data_callback = nil
  @close_callback = close_callback

  @callback_connection = nil
  @close_after_write_complete = false
  @closing = false

  @mutex = Mutex.new # to serialize #write and #close
end

Instance Attribute Details

#close_after_write_complete=(value) ⇒ Object (writeonly)

Sets the attribute close_after_write_complete

Parameters:

  • value

    the value to set the attribute close_after_write_complete to.



553
554
555
# File 'lib/fluent/plugin_helper/server.rb', line 553

def close_after_write_complete=(value)
  @close_after_write_complete = value
end

#closingObject (readonly)

Returns the value of attribute closing.



552
553
554
# File 'lib/fluent/plugin_helper/server.rb', line 552

def closing
  @closing
end

Instance Method Details

#closeObject



630
631
632
633
634
635
636
637
# File 'lib/fluent/plugin_helper/server.rb', line 630

def close
  @mutex.synchronize do
    return if @closing
    @closing = true
    @close_callback.call(self)
    super
  end
end

#data(&callback) ⇒ Object



581
582
583
584
585
586
587
588
589
590
591
# File 'lib/fluent/plugin_helper/server.rb', line 581

def data(&callback)
  raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
  @data_callback = callback
  on_read_impl = case callback.arity
                 when 1 then :on_read_without_connection
                 when 2 then :on_read_with_connection
                 else
                   raise "BUG: callback block must have 1 or 2 arguments"
                 end
  self.define_singleton_method(:on_read, method(on_read_impl))
end

#on_connectObject



604
605
606
607
608
609
610
# File 'lib/fluent/plugin_helper/server.rb', line 604

def on_connect
  @callback_connection = TCPCallbackSocket.new(self)
  @connect_callback.call(@callback_connection)
  unless @data_callback
    raise "connection callback must call #data to set data callback"
  end
end

#on_read_with_connection(data) ⇒ Object



621
622
623
624
625
626
627
628
# File 'lib/fluent/plugin_helper/server.rb', line 621

def on_read_with_connection(data)
  @data_callback.call(data, @callback_connection)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_read_without_connection(data) ⇒ Object



612
613
614
615
616
617
618
619
# File 'lib/fluent/plugin_helper/server.rb', line 612

def on_read_without_connection(data)
  @data_callback.call(data)
rescue => e
  @log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
  @log.error_backtrace
  close rescue nil
  raise if @under_plugin_development
end

#on_writableObject



599
600
601
602
# File 'lib/fluent/plugin_helper/server.rb', line 599

def on_writable
  super
  close if @close_after_write_complete
end

#to_ioObject



577
578
579
# File 'lib/fluent/plugin_helper/server.rb', line 577

def to_io
  @_handler_socket
end

#write(data) ⇒ Object



593
594
595
596
597
# File 'lib/fluent/plugin_helper/server.rb', line 593

def write(data)
  @mutex.synchronize do
    super
  end
end