Class: OnStomp::Connections::Base

Inherits:
Object
  • Object
show all
Includes:
Interfaces::ConnectionEvents
Defined in:
lib/onstomp/connections/base.rb

Overview

Common behavior for all connections.

Direct Known Subclasses

Stomp_1_0, Stomp_1_1

Constant Summary collapse

MAX_BYTES_PER_WRITE =

The approximate maximum number of bytes to write per call to #io_process_write.

1024 * 8
MAX_BYTES_PER_READ =

The maximum number of bytes to read per call to #io_process_read

1024 * 4

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Interfaces::ConnectionEvents

#install_bindings_from_client, #trigger_connection_event

Methods included from Interfaces::EventManager

#bind_event, #event_callbacks, included, #trigger_event

Constructor Details

#initialize(socket, client) ⇒ Base

Creates a new connection using the given #socket object and client. The #socket object will generally be a TCPSocket or an OpenSSL::SSL::SSLSocket and must support the methods read_nonblock write_nonblock, and close.

Parameters:



21
22
23
24
25
26
27
28
29
# File 'lib/onstomp/connections/base.rb', line 21

def initialize socket, client
  @socket = socket
  @write_mutex = Mutex.new
  @closing = false
  @write_buffer = []
  @read_buffer = []
  @client = client
  @connection_up = false
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(meth, *args, &block) ⇒ Object

Checks if the missing method ends with ‘_frame’, and if so raises a UnsupportedCommandError exception.



88
89
90
91
92
93
94
# File 'lib/onstomp/connections/base.rb', line 88

def method_missing meth, *args, &block
  if meth.to_s =~ /^(.*)_frame$/
    raise OnStomp::UnsupportedCommandError, $1.upcase
  else
    super
  end
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



6
7
8
# File 'lib/onstomp/connections/base.rb', line 6

def client
  @client
end

#last_received_atObject (readonly)

Returns the value of attribute last_received_at.



7
8
9
# File 'lib/onstomp/connections/base.rb', line 7

def last_received_at
  @last_received_at
end

#last_transmitted_atObject (readonly)

Returns the value of attribute last_transmitted_at.



7
8
9
# File 'lib/onstomp/connections/base.rb', line 7

def last_transmitted_at
  @last_transmitted_at
end

#socketObject (readonly)

Returns the value of attribute socket.



6
7
8
# File 'lib/onstomp/connections/base.rb', line 6

def socket
  @socket
end

#versionObject (readonly)

Returns the value of attribute version.



6
7
8
# File 'lib/onstomp/connections/base.rb', line 6

def version
  @version
end

Instance Method Details

#close(blocking = false) ⇒ Object

Closes the #socket. If blocking is true, the socket will be closed immediately, otherwies the socket will remain open until #io_process_write has finished writing all of its buffered data. Once this method has been invoked, #write_frame_nonblock will not enqueue any additional frames for writing.

Parameters:

  • blocking (true, false) (defaults to: false)


55
56
57
58
59
60
61
# File 'lib/onstomp/connections/base.rb', line 55

def close blocking=false
  @write_mutex.synchronize { @closing = true }
  if blocking
    io_process_write until @write_buffer.empty?
    socket.close
  end
end

#configure(connected, con_cbs) ⇒ Object

Performs any necessary configuration of the connection from the CONNECTED frame sent by the broker and a Hash of pending callbacks. This method is called after the protocol negotiation has taken place between client and broker, and the connection that receives it will be the connection used by the client for the duration of the session.

Parameters:



38
39
40
41
# File 'lib/onstomp/connections/base.rb', line 38

def configure connected, con_cbs
  @version = connected.header?(:version) ? connected[:version] : '1.0'
  install_bindings_from_client con_cbs
end

#connect(client, *headers) ⇒ Object

Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame. The supplied list of headers will be merged into the CONNECT frame sent to the broker.

Parameters:

Raises:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/onstomp/connections/base.rb', line 68

def connect client, *headers
  write_frame_nonblock connect_frame(*headers)
  client_con = nil
  until client_con
    io_process_write { |f| client_con ||= f }
  end
  broker_con = nil
  until broker_con
    io_process_read { |f| broker_con ||= f }
  end
  raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED'
  vers = broker_con.header?(:version) ? broker_con[:version] : '1.0'
  raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers)
  @connection_up = true
  [ vers, broker_con ]
end

#connected?true, false

Returns true if the socket has not been closed, false otherwise.

Returns:

  • (true, false)


45
46
47
# File 'lib/onstomp/connections/base.rb', line 45

def connected?
  !socket.closed?
end

#flush_write_bufferObject

Flushes the write buffer by invoking #io_process_write until the buffer is empty.



98
99
100
# File 'lib/onstomp/connections/base.rb', line 98

def flush_write_buffer
  io_process_write until @write_buffer.empty?
end

#io_process(&cb) ⇒ Object

Makes a single call to #io_process_write and a single call to #io_process_read



104
105
106
107
108
109
110
# File 'lib/onstomp/connections/base.rb', line 104

def io_process &cb
  io_process_write &cb
  io_process_read &cb
  if @connection_up && !connected?
    triggered_close 'connection timed out', :died
  end
end

#io_process_readObject

Reads serialized frame data from the socket if we’re connected and and the socket is ready for reading. The received data will be pushed to the end of a read buffer, which is then sent to the connection’s serializer for processing.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/onstomp/connections/base.rb', line 185

def io_process_read
  if connected? && IO.select([socket], nil, nil, 0.1)
    begin
      data = socket.read_nonblock(MAX_BYTES_PER_READ)
      @read_buffer << data
      @last_received_at = Time.now
      serializer.bytes_to_frame(@read_buffer) do |frame|
        yield frame if block_given?
        client.dispatch_received frame
      end
    rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
      # do not
    rescue EOFError
      triggered_close $!.message
    rescue Exception
      triggered_close $!.message, :terminated
      raise
    end
  end
end

#io_process_writeObject

Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing. Once a complete frame has been written, this method will call OnStomp::Client#dispatch_transmitted to notify the client that the frame has been sent to the broker. If a complete frame cannot be written without blocking, the unsent data is sent to the head of the write buffer to be processed first the next time this method is invoked.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/onstomp/connections/base.rb', line 148

def io_process_write
  if @write_buffer.length > 0 && IO.select(nil, [socket], nil, 0.1)
    to_shift = @write_buffer.length / 3
    written = 0
    while written < MAX_BYTES_PER_WRITE
      data, frame = shift_write_buffer
      break unless data && connected?
      begin
        w = socket.write_nonblock(data)
        written += w
        @last_transmitted_at = Time.now
        if w < data.length
          unshift_write_buffer data[w..-1], frame
        else
          yield frame if block_given?
          client.dispatch_transmitted frame
        end
      rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK
        # writing will either block, or cannot otherwise be completed,
        # put data back and try again some other day
        unshift_write_buffer data, frame
        break
      rescue Exception
        triggered_close $!.message, :terminated
        raise
      end
    end
  end
  if @write_buffer.empty? && @closing
    triggered_close 'client disconnected'
  end
end

#push_write_buffer(data, frame) ⇒ Object

Adds data and frame pair to the end of the write buffer

Parameters:



123
124
125
126
127
# File 'lib/onstomp/connections/base.rb', line 123

def push_write_buffer data, frame
  @write_mutex.synchronize {
    @write_buffer << [data, frame] unless @closing
  }
end

#shift_write_bufferObject

Removes the first data and frame pair from the write buffer

Parameters:



131
132
133
# File 'lib/onstomp/connections/base.rb', line 131

def shift_write_buffer
  @write_mutex.synchronize { @write_buffer.shift }
end

#unshift_write_buffer(data, frame) ⇒ Object

Adds the remains of data and frame pair to the head of the write buffer

Parameters:



137
138
139
# File 'lib/onstomp/connections/base.rb', line 137

def unshift_write_buffer data, frame
  @write_mutex.synchronize { @write_buffer.unshift [data, frame] }
end

#write_frame_nonblock(frame) ⇒ Object

Serializes the given frame and adds the data to the connections internal write buffer

Parameters:



115
116
117
118
# File 'lib/onstomp/connections/base.rb', line 115

def write_frame_nonblock frame
  ser = serializer.frame_to_bytes frame
  push_write_buffer ser, frame
end