Class: EventMachine::StreamObject

Inherits:
Selectable show all
Defined in:
lib/em/pure_ruby.rb

Direct Known Subclasses

EvmaKeyboard, EvmaTCPClient, EvmaUNIXClient

Instance Attribute Summary

Attributes inherited from Selectable

#io, #is_server, #uuid

Instance Method Summary collapse

Methods inherited from Selectable

#close_scheduled?, #schedule_close, #set_inactivity_timeout

Constructor Details

#initialize(io) ⇒ StreamObject

Returns a new instance of StreamObject.



774
775
776
777
# File 'lib/em/pure_ruby.rb', line 774

def initialize io
  super io
  @outbound_q = []
end

Instance Method Details

#eventable_readObject

Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. If we have it, then we can read multiple times safely to improve performance. The last-activity clock ASSUMES that we only come here when we have selected readable. TODO, coalesce multiple reads into a single event. TODO, do the function check somewhere else and cache it.



808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
# File 'lib/em/pure_ruby.rb', line 808

def eventable_read
  @last_activity = Reactor.instance.current_loop_time
  begin
    if io.respond_to?(:read_nonblock)
      10.times {
        data = io.read_nonblock(4096)
        EventMachine::event_callback uuid, ConnectionData, data
      }
    else
      data = io.sysread(4096)
      EventMachine::event_callback uuid, ConnectionData, data
    end
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, SSLConnectionWaitReadable
    # no-op
  rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError, Errno::EPIPE, OpenSSL::SSL::SSLError
    @close_scheduled = true
    EventMachine::event_callback uuid, ConnectionUnbound, nil
  end

end

#eventable_writeObject

Provisional implementation. Will be re-implemented in subclasses. TODO: Complete this implementation. As it stands, this only writes a single packet per cycle. Highly inefficient, but required unless we’re running on a Ruby with proper nonblocking I/O (Ruby 1.8.4 built from sources from May 25, 2006 or newer). We need to improve the loop so it writes multiple times, however not more than a certain number of bytes per cycle, otherwise one busy connection could hog output buffers and slow down other connections. Also we should coalesce small writes. URGENT TODO: Coalesce small writes. They are a performance killer. The last-activity recorder ASSUMES we’ll only come here if we’ve selected writable.



841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
# File 'lib/em/pure_ruby.rb', line 841

def eventable_write
  # coalesce the outbound array here, perhaps
  @last_activity = Reactor.instance.current_loop_time
  while data = @outbound_q.shift do
    begin
      data = data.to_s
      w = if io.respond_to?(:write_nonblock)
            io.write_nonblock data
          else
            io.syswrite data
          end

      if w < data.length
        @outbound_q.unshift data[w..-1]
        break
      end
    rescue Errno::EAGAIN, SSLConnectionWaitReadable, SSLConnectionWaitWritable
      @outbound_q.unshift data
      break
    rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE, OpenSSL::SSL::SSLError
      @close_scheduled = true
      @outbound_q.clear
    end
  end

end

#get_outbound_data_sizeObject

#get_outbound_data_size



893
894
895
# File 'lib/em/pure_ruby.rb', line 893

def get_outbound_data_size
  @outbound_q.inject(0) {|memo,obj| memo += (obj || "").length}
end

#get_peernameObject

#get_peername This is defined in the normal way on connected stream objects. Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants. We could also use a convenience method that did the unpacking automatically.



880
881
882
# File 'lib/em/pure_ruby.rb', line 880

def get_peername
  io.getpeername
end

#get_socknameObject

#get_sockname This is defined in the normal way on connected stream objects. Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants. We could also use a convenience method that did the unpacking automatically.



888
889
890
# File 'lib/em/pure_ruby.rb', line 888

def get_sockname
  io.getsockname
end

#heartbeatObject



897
898
899
900
901
# File 'lib/em/pure_ruby.rb', line 897

def heartbeat
  if @inactivity_timeout and @inactivity_timeout > 0 and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time
    schedule_close true
  end
end

#select_for_reading?Boolean

If we have to close, or a close-after-writing has been requested, then don’t read any more data.

Returns:

  • (Boolean)


781
782
783
# File 'lib/em/pure_ruby.rb', line 781

def select_for_reading?
  true unless (@close_scheduled || @close_requested)
end

#select_for_writing?Boolean

If we have to close, don’t select for writing. Otherwise, see if the protocol is ready to close. If not, see if he has data to send. If a close-after-writing has been requested and the outbound queue is empty, convert the status to close_scheduled.

Returns:

  • (Boolean)


790
791
792
793
794
795
796
797
798
799
# File 'lib/em/pure_ruby.rb', line 790

def select_for_writing?
  unless @close_scheduled
    if @outbound_q.empty?
      @close_scheduled = true if @close_requested
      false
    else
      true
    end
  end
end

#send_data(data) ⇒ Object

#send_data



869
870
871
872
873
874
# File 'lib/em/pure_ruby.rb', line 869

def send_data data
  # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last?
  unless @close_scheduled or @close_requested or !data or data.length <= 0
    @outbound_q << data.to_s
  end
end