Class: Revactor::TCP::Socket

Inherits:
Rev::TCPSocket
  • Object
show all
Defined in:
lib/revactor/tcp.rb,
lib/revactor/mongrel.rb

Overview

TCP socket class, returned by Revactor::TCP.connect and Revactor::TCP::Listener#accept

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, options = {}) ⇒ Socket

Returns a new instance of Socket.



103
104
105
106
107
108
109
110
111
112
# File 'lib/revactor/tcp.rb', line 103

def initialize(socket, options = {})        
  super(socket)
  
  @active ||= options[:active] || false
  @controller ||= options[:controller] || Actor.current
  @filterset ||= [*initialize_filter(options[:filter])]
  
  @receiver = @controller
  @read_buffer = IO::Buffer.new
end

Instance Attribute Details

#controllerObject

Returns the value of attribute controller.



75
76
77
# File 'lib/revactor/tcp.rb', line 75

def controller
  @controller
end

Class Method Details

.connect(host, port, options = {}) ⇒ Object

Connect to the specified host and port. Host may be a domain name or IP address. Accepts the following options:

:active - Controls how data is read from the socket.  See the
          documentation for #active=

:controller - The controlling actor, default Actor.current

:filter - An symbol/class or array of symbols/classes which implement 
          #encode and #decode methods to transform data sent and 
          received data respectively via Revactor::TCP::Socket.
          See the "Filters" section in the README for more information


91
92
93
94
95
96
97
98
99
100
# File 'lib/revactor/tcp.rb', line 91

def connect(host, port, options = {})
  options[:active]     ||= false
  options[:controller] ||= Actor.current

  super.instance_eval {
    @active, @controller = options[:active], options[:controller]
    @filterset = [*initialize_filter(options[:filter])]
    self
  }
end

Instance Method Details

#active=(state) ⇒ Object

Enable or disable active mode data reception. State can be any of the following:

true - All received data is sent to the controlling actor
false - Receiving data is disabled
:once - A single message will be sent to the controlling actor
        then active mode will be disabled


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/revactor/tcp.rb', line 126

def active=(state)
  unless @receiver == @controller
    raise "cannot change active state during a synchronous call" 
  end
  
  unless [true, false, :once].include? state
    raise ArgumentError, "must be true, false, or :once" 
  end
  
  if [true, :once].include?(state)
    unless @read_buffer.empty?
      @receiver << [:tcp, self, @read_buffer.read]
      return if state == :once
    end
    
    enable unless enabled?
  end
  
  @active = state
end

#active?Boolean

Is the socket in active mode?

Returns:

  • (Boolean)


148
# File 'lib/revactor/tcp.rb', line 148

def active?; @active; end

#inspectObject



114
115
116
# File 'lib/revactor/tcp.rb', line 114

def inspect
  "#<#{self.class}:0x#{object_id.to_s(16)} #{@remote_host}:#{@remote_port}>"
end

#read(length = nil, options = {}) ⇒ Object

Read data from the socket synchronously. If a length is specified then the call blocks until the given length has been read. Otherwise the call blocks until it receives any data.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/revactor/tcp.rb', line 161

def read(length = nil, options = {})
  # Only one synchronous call allowed at a time
  raise "already being called synchronously" unless @receiver == @controller
  
  unless @read_buffer.empty? or (length and @read_buffer.size < length)
    return @read_buffer.read(length) 
  end
  
  active = @active
  @active = :once
  @receiver = Actor.current
  enable unless enabled?
  
  loop do
    Actor.receive do |filter|
      filter.when(T[:tcp, self]) do |_, _, data|
        if length.nil?
          @receiver = @controller
          @active = active
          enable if @active
          
          return data
        end
        
        @read_buffer << data
        
        if @read_buffer.size >= length
          @receiver = @controller
          @active = active
          enable if @active
          
          return @read_buffer.read(length)
        end
      end
      
      filter.when(T[:tcp_closed, self]) do
        unless @receiver == @controller
          @receiver = @controller
          @receiver << T[:tcp_closed, self]
        end
        
        raise EOFError, "connection closed"
      end

      if timeout = options[:timeout]
        filter.after(timeout) { raise ReadError, "read timed out" }
      end
    end
  end
end

#readpartial(value = nil) ⇒ Object

Monkeypatched readpartial routine inserted whenever Revactor’s mongrel.rb is loaded. The value passed to this method is ignored, so it is not fully compatible with Socket’s readpartial method.

Mongrel doesn’t really care if we read more than Const::CHUNK_SIZE and readpartial doesn’t really make sense in Revactor’s API since read accomplishes the same functionality. So, in this implementation readpartial just calls read and returns whatever is available.



13
14
15
# File 'lib/revactor/mongrel.rb', line 13

def readpartial(value = nil)
  read
end

#write(data, options = {}) ⇒ Object Also known as: <<

Write data to the socket. The call blocks until all data has been written.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/revactor/tcp.rb', line 213

def write(data, options = {})
  # Only one synchronous call allowed at a time
  raise "already being called synchronously" unless @receiver == @controller
  
  active = @active
  @active = false
  @receiver = Actor.current
  disable if @active
  
  super(encode(data))
  
  Actor.receive do |filter|
    filter.when(T[:tcp_write_complete, self]) do
      @receiver = @controller
      @active = active
      enable if @active and not enabled?
      
      return data.size
    end
    
    filter.when(T[:tcp_closed, self]) do
      @active = false
      raise EOFError, "connection closed"
    end

    if timeout = options[:timeout]
      filter.after(timeout) { raise WriteError, "write timed out" }
    end
  end
end