Class: Revactor::TCP::Socket
- Inherits:
-
Rev::TCPSocket
- Object
- Rev::TCPSocket
- Revactor::TCP::Socket
- Defined in:
- lib/revactor/tcp.rb,
lib/revactor/mongrel.rb
Overview
TCP socket class, returned by Revactor::TCP.connect and Revactor::TCP::Listener#accept
Instance Attribute Summary collapse
-
#controller ⇒ Object
Returns the value of attribute controller.
Class Method Summary collapse
-
.connect(host, port, options = {}) ⇒ Object
Connect to the specified host and port.
Instance Method Summary collapse
-
#active=(state) ⇒ Object
Enable or disable active mode data reception.
-
#active? ⇒ Boolean
Is the socket in active mode?.
-
#initialize(socket, options = {}) ⇒ Socket
constructor
A new instance of Socket.
- #inspect ⇒ Object
-
#read(length = nil, options = {}) ⇒ Object
Read data from the socket synchronously.
-
#readpartial(value = nil) ⇒ Object
Monkeypatched readpartial routine inserted whenever Revactor’s mongrel.rb is loaded.
-
#write(data, options = {}) ⇒ Object
(also: #<<)
Write data to the socket.
Constructor Details
#initialize(socket, options = {}) ⇒ Socket
Returns a new instance of Socket.
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/revactor/tcp.rb', line 103 def initialize(socket, = {}) super(socket) @active ||= [:active] || false @controller ||= [:controller] || Actor.current @filterset ||= [*initialize_filter([:filter])] @receiver = @controller @read_buffer = IO::Buffer.new end |
Instance Attribute Details
#controller ⇒ Object
Returns the value of attribute controller.
75 76 77 |
# File 'lib/revactor/tcp.rb', line 75 def controller @controller end |
Class Method Details
.connect(host, port, options = {}) ⇒ Object
Connect to the specified host and port. Host may be a domain name or IP address. Accepts the following options:
:active - Controls how data is read from the socket. See the
documentation for #active=
:controller - The controlling actor, default Actor.current
:filter - An symbol/class or array of symbols/classes which implement
#encode and #decode methods to transform data sent and
received data respectively via Revactor::TCP::Socket.
See the "Filters" section in the README for more information
91 92 93 94 95 96 97 98 99 100 |
# File 'lib/revactor/tcp.rb', line 91 def connect(host, port, = {}) [:active] ||= false [:controller] ||= Actor.current super.instance_eval { @active, @controller = [:active], [:controller] @filterset = [*initialize_filter([:filter])] self } end |
Instance Method Details
#active=(state) ⇒ Object
Enable or disable active mode data reception. State can be any of the following:
true - All received data is sent to the controlling actor
false - Receiving data is disabled
:once - A single message will be sent to the controlling actor
then active mode will be disabled
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/revactor/tcp.rb', line 126 def active=(state) unless @receiver == @controller raise "cannot change active state during a synchronous call" end unless [true, false, :once].include? state raise ArgumentError, "must be true, false, or :once" end if [true, :once].include?(state) unless @read_buffer.empty? @receiver << [:tcp, self, @read_buffer.read] return if state == :once end enable unless enabled? end @active = state end |
#active? ⇒ Boolean
Is the socket in active mode?
148 |
# File 'lib/revactor/tcp.rb', line 148 def active?; @active; end |
#inspect ⇒ Object
114 115 116 |
# File 'lib/revactor/tcp.rb', line 114 def inspect "#<#{self.class}:0x#{object_id.to_s(16)} #{@remote_host}:#{@remote_port}>" end |
#read(length = nil, options = {}) ⇒ Object
Read data from the socket synchronously. If a length is specified then the call blocks until the given length has been read. Otherwise the call blocks until it receives any data.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/revactor/tcp.rb', line 161 def read(length = nil, = {}) # Only one synchronous call allowed at a time raise "already being called synchronously" unless @receiver == @controller unless @read_buffer.empty? or (length and @read_buffer.size < length) return @read_buffer.read(length) end active = @active @active = :once @receiver = Actor.current enable unless enabled? loop do Actor.receive do |filter| filter.when(T[:tcp, self]) do |_, _, data| if length.nil? @receiver = @controller @active = active enable if @active return data end @read_buffer << data if @read_buffer.size >= length @receiver = @controller @active = active enable if @active return @read_buffer.read(length) end end filter.when(T[:tcp_closed, self]) do unless @receiver == @controller @receiver = @controller @receiver << T[:tcp_closed, self] end raise EOFError, "connection closed" end if timeout = [:timeout] filter.after(timeout) { raise ReadError, "read timed out" } end end end end |
#readpartial(value = nil) ⇒ Object
Monkeypatched readpartial routine inserted whenever Revactor’s mongrel.rb is loaded. The value passed to this method is ignored, so it is not fully compatible with Socket’s readpartial method.
Mongrel doesn’t really care if we read more than Const::CHUNK_SIZE and readpartial doesn’t really make sense in Revactor’s API since read accomplishes the same functionality. So, in this implementation readpartial just calls read and returns whatever is available.
13 14 15 |
# File 'lib/revactor/mongrel.rb', line 13 def readpartial(value = nil) read end |
#write(data, options = {}) ⇒ Object Also known as: <<
Write data to the socket. The call blocks until all data has been written.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/revactor/tcp.rb', line 213 def write(data, = {}) # Only one synchronous call allowed at a time raise "already being called synchronously" unless @receiver == @controller active = @active @active = false @receiver = Actor.current disable if @active super(encode(data)) Actor.receive do |filter| filter.when(T[:tcp_write_complete, self]) do @receiver = @controller @active = active enable if @active and not enabled? return data.size end filter.when(T[:tcp_closed, self]) do @active = false raise EOFError, "connection closed" end if timeout = [:timeout] filter.after(timeout) { raise WriteError, "write timed out" } end end end |