Class: EventMachine::StreamObject
- Inherits:
-
Selectable
- Object
- Selectable
- EventMachine::StreamObject
- Defined in:
- lib/em/pure_ruby.rb
Direct Known Subclasses
Instance Attribute Summary
Attributes inherited from Selectable
Instance Method Summary collapse
-
#eventable_read ⇒ Object
Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006.
-
#eventable_write ⇒ Object
Provisional implementation.
-
#get_outbound_data_size ⇒ Object
#get_outbound_data_size.
-
#get_peername ⇒ Object
#get_peername This is defined in the normal way on connected stream objects.
-
#get_sockname ⇒ Object
#get_sockname This is defined in the normal way on connected stream objects.
- #heartbeat ⇒ Object
-
#initialize(io) ⇒ StreamObject
constructor
A new instance of StreamObject.
-
#select_for_reading? ⇒ Boolean
If we have to close, or a close-after-writing has been requested, then don’t read any more data.
-
#select_for_writing? ⇒ Boolean
If we have to close, don’t select for writing.
-
#send_data(data) ⇒ Object
#send_data.
Methods inherited from Selectable
#close_scheduled?, #schedule_close, #set_inactivity_timeout
Constructor Details
#initialize(io) ⇒ StreamObject
Returns a new instance of StreamObject.
774 775 776 777 |
# File 'lib/em/pure_ruby.rb', line 774 def initialize io super io @outbound_q = [] end |
Instance Method Details
#eventable_read ⇒ Object
Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. If we have it, then we can read multiple times safely to improve performance. The last-activity clock ASSUMES that we only come here when we have selected readable. TODO, coalesce multiple reads into a single event. TODO, do the function check somewhere else and cache it.
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 |
# File 'lib/em/pure_ruby.rb', line 808 def eventable_read @last_activity = Reactor.instance.current_loop_time begin if io.respond_to?(:read_nonblock) 10.times { data = io.read_nonblock(4096) EventMachine::event_callback uuid, ConnectionData, data } else data = io.sysread(4096) EventMachine::event_callback uuid, ConnectionData, data end rescue Errno::EAGAIN, Errno::EWOULDBLOCK, SSLConnectionWaitReadable # no-op rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError, Errno::EPIPE, OpenSSL::SSL::SSLError @close_scheduled = true EventMachine::event_callback uuid, ConnectionUnbound, nil end end |
#eventable_write ⇒ Object
Provisional implementation. Will be re-implemented in subclasses. TODO: Complete this implementation. As it stands, this only writes a single packet per cycle. Highly inefficient, but required unless we’re running on a Ruby with proper nonblocking I/O (Ruby 1.8.4 built from sources from May 25, 2006 or newer). We need to improve the loop so it writes multiple times, however not more than a certain number of bytes per cycle, otherwise one busy connection could hog output buffers and slow down other connections. Also we should coalesce small writes. URGENT TODO: Coalesce small writes. They are a performance killer. The last-activity recorder ASSUMES we’ll only come here if we’ve selected writable.
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 |
# File 'lib/em/pure_ruby.rb', line 841 def eventable_write # coalesce the outbound array here, perhaps @last_activity = Reactor.instance.current_loop_time while data = @outbound_q.shift do begin data = data.to_s w = if io.respond_to?(:write_nonblock) io.write_nonblock data else io.syswrite data end if w < data.length @outbound_q.unshift data[w..-1] break end rescue Errno::EAGAIN, SSLConnectionWaitReadable, SSLConnectionWaitWritable @outbound_q.unshift data break rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE, OpenSSL::SSL::SSLError @close_scheduled = true @outbound_q.clear end end end |
#get_outbound_data_size ⇒ Object
#get_outbound_data_size
893 894 895 |
# File 'lib/em/pure_ruby.rb', line 893 def get_outbound_data_size @outbound_q.inject(0) {|memo,obj| memo += (obj || "").length} end |
#get_peername ⇒ Object
#get_peername This is defined in the normal way on connected stream objects. Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants. We could also use a convenience method that did the unpacking automatically.
880 881 882 |
# File 'lib/em/pure_ruby.rb', line 880 def get_peername io.getpeername end |
#get_sockname ⇒ Object
#get_sockname This is defined in the normal way on connected stream objects. Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants. We could also use a convenience method that did the unpacking automatically.
888 889 890 |
# File 'lib/em/pure_ruby.rb', line 888 def get_sockname io.getsockname end |
#heartbeat ⇒ Object
897 898 899 900 901 |
# File 'lib/em/pure_ruby.rb', line 897 def heartbeat if @inactivity_timeout and @inactivity_timeout > 0 and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time schedule_close true end end |
#select_for_reading? ⇒ Boolean
If we have to close, or a close-after-writing has been requested, then don’t read any more data.
781 782 783 |
# File 'lib/em/pure_ruby.rb', line 781 def select_for_reading? true unless (@close_scheduled || @close_requested) end |
#select_for_writing? ⇒ Boolean
If we have to close, don’t select for writing. Otherwise, see if the protocol is ready to close. If not, see if he has data to send. If a close-after-writing has been requested and the outbound queue is empty, convert the status to close_scheduled.
790 791 792 793 794 795 796 797 798 799 |
# File 'lib/em/pure_ruby.rb', line 790 def select_for_writing? unless @close_scheduled if @outbound_q.empty? @close_scheduled = true if @close_requested false else true end end end |
#send_data(data) ⇒ Object
#send_data
869 870 871 872 873 874 |
# File 'lib/em/pure_ruby.rb', line 869 def send_data data # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last? unless @close_scheduled or @close_requested or !data or data.length <= 0 @outbound_q << data.to_s end end |