Class: OnStomp::Connections::Base
- Inherits:
-
Object
- Object
- OnStomp::Connections::Base
- Includes:
- Interfaces::ConnectionEvents
- Defined in:
- lib/onstomp/connections/base.rb
Overview
Common behavior for all connections.
Constant Summary collapse
- MAX_BYTES_PER_WRITE =
The approximate maximum number of bytes to write per call to #io_process_write.
1024 * 8
- MAX_BYTES_PER_READ =
The maximum number of bytes to read per call to #io_process_read
1024 * 4
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#last_received_at ⇒ Object
readonly
Returns the value of attribute last_received_at.
-
#last_transmitted_at ⇒ Object
readonly
Returns the value of attribute last_transmitted_at.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#version ⇒ Object
readonly
Returns the value of attribute version.
Instance Method Summary collapse
-
#close(blocking = false) ⇒ Object
Closes the #socket.
-
#configure(connected, con_cbs) ⇒ Object
Performs any necessary configuration of the connection from the CONNECTED frame sent by the broker and a
Hash
of pending callbacks. -
#connect(client, *headers) ⇒ Object
Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame.
-
#connected? ⇒ true, false
Returns true if the socket has not been closed, false otherwise.
-
#flush_write_buffer ⇒ Object
Flushes the write buffer by invoking #io_process_write until the buffer is empty.
- #initialize(socket, client) ⇒ Base constructor
-
#io_process(&cb) ⇒ Object
Makes a single call to #io_process_write and a single call to #io_process_read.
-
#io_process_read ⇒ Object
Reads serialized frame data from the socket if we’re connected and and the socket is ready for reading.
-
#io_process_write ⇒ Object
Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing.
-
#method_missing(meth, *args, &block) ⇒ Object
Checks if the missing method ends with ‘_frame’, and if so raises a UnsupportedCommandError exception.
-
#push_write_buffer(data, frame) ⇒ Object
Adds data and frame pair to the end of the write buffer.
-
#shift_write_buffer ⇒ Object
Removes the first data and frame pair from the write buffer.
-
#unshift_write_buffer(data, frame) ⇒ Object
Adds the remains of data and frame pair to the head of the write buffer.
-
#write_frame_nonblock(frame) ⇒ Object
Serializes the given frame and adds the data to the connections internal write buffer.
Methods included from Interfaces::ConnectionEvents
#install_bindings_from_client, #trigger_connection_event
Methods included from Interfaces::EventManager
#bind_event, #event_callbacks, included, #trigger_event
Constructor Details
#initialize(socket, client) ⇒ Base
21 22 23 24 25 26 27 28 29 |
# File 'lib/onstomp/connections/base.rb', line 21 def initialize socket, client @socket = socket @write_mutex = Mutex.new @closing = false @write_buffer = [] @read_buffer = [] @client = client @connection_up = false end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(meth, *args, &block) ⇒ Object
Checks if the missing method ends with ‘_frame’, and if so raises a UnsupportedCommandError exception.
88 89 90 91 92 93 94 |
# File 'lib/onstomp/connections/base.rb', line 88 def method_missing meth, *args, &block if meth.to_s =~ /^(.*)_frame$/ raise OnStomp::UnsupportedCommandError, $1.upcase else super end end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def client @client end |
#last_received_at ⇒ Object (readonly)
Returns the value of attribute last_received_at.
7 8 9 |
# File 'lib/onstomp/connections/base.rb', line 7 def last_received_at @last_received_at end |
#last_transmitted_at ⇒ Object (readonly)
Returns the value of attribute last_transmitted_at.
7 8 9 |
# File 'lib/onstomp/connections/base.rb', line 7 def last_transmitted_at @last_transmitted_at end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def socket @socket end |
#version ⇒ Object (readonly)
Returns the value of attribute version.
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def version @version end |
Instance Method Details
#close(blocking = false) ⇒ Object
Closes the #socket. If blocking
is true, the socket will be closed immediately, otherwies the socket will remain open until #io_process_write has finished writing all of its buffered data. Once this method has been invoked, #write_frame_nonblock will not enqueue any additional frames for writing.
55 56 57 58 59 60 61 |
# File 'lib/onstomp/connections/base.rb', line 55 def close blocking=false @write_mutex.synchronize { @closing = true } if blocking io_process_write until @write_buffer.empty? socket.close end end |
#configure(connected, con_cbs) ⇒ Object
Performs any necessary configuration of the connection from the CONNECTED frame sent by the broker and a Hash
of pending callbacks. This method is called after the protocol negotiation has taken place between client and broker, and the connection that receives it will be the connection used by the client for the duration of the session.
38 39 40 41 |
# File 'lib/onstomp/connections/base.rb', line 38 def configure connected, con_cbs @version = connected.header?(:version) ? connected[:version] : '1.0' install_bindings_from_client con_cbs end |
#connect(client, *headers) ⇒ Object
Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame. The supplied list of headers will be merged into the CONNECT frame sent to the broker.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/onstomp/connections/base.rb', line 68 def connect client, *headers write_frame_nonblock connect_frame(*headers) client_con = nil until client_con io_process_write { |f| client_con ||= f } end broker_con = nil until broker_con io_process_read { |f| broker_con ||= f } end raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED' vers = broker_con.header?(:version) ? broker_con[:version] : '1.0' raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers) @connection_up = true [ vers, broker_con ] end |
#connected? ⇒ true, false
Returns true if the socket has not been closed, false otherwise.
45 46 47 |
# File 'lib/onstomp/connections/base.rb', line 45 def connected? !socket.closed? end |
#flush_write_buffer ⇒ Object
Flushes the write buffer by invoking #io_process_write until the buffer is empty.
98 99 100 |
# File 'lib/onstomp/connections/base.rb', line 98 def flush_write_buffer io_process_write until @write_buffer.empty? end |
#io_process(&cb) ⇒ Object
Makes a single call to #io_process_write and a single call to #io_process_read
104 105 106 107 108 109 110 |
# File 'lib/onstomp/connections/base.rb', line 104 def io_process &cb io_process_write &cb io_process_read &cb if @connection_up && !connected? triggered_close 'connection timed out', :died end end |
#io_process_read ⇒ Object
Reads serialized frame data from the socket if we’re connected and and the socket is ready for reading. The received data will be pushed to the end of a read buffer, which is then sent to the connection’s serializer for processing.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/onstomp/connections/base.rb', line 185 def io_process_read if connected? && IO.select([socket], nil, nil, 0.1) begin data = socket.read_nonblock(MAX_BYTES_PER_READ) @read_buffer << data @last_received_at = Time.now serializer.bytes_to_frame(@read_buffer) do |frame| yield frame if block_given? client.dispatch_received frame end rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK # do not rescue EOFError triggered_close $!. rescue Exception triggered_close $!., :terminated raise end end end |
#io_process_write ⇒ Object
Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing. Once a complete frame has been written, this method will call OnStomp::Client#dispatch_transmitted to notify the client that the frame has been sent to the broker. If a complete frame cannot be written without blocking, the unsent data is sent to the head of the write buffer to be processed first the next time this method is invoked.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/onstomp/connections/base.rb', line 148 def io_process_write if @write_buffer.length > 0 && IO.select(nil, [socket], nil, 0.1) to_shift = @write_buffer.length / 3 written = 0 while written < MAX_BYTES_PER_WRITE data, frame = shift_write_buffer break unless data && connected? begin w = socket.write_nonblock(data) written += w @last_transmitted_at = Time.now if w < data.length unshift_write_buffer data[w..-1], frame else yield frame if block_given? client.dispatch_transmitted frame end rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK # writing will either block, or cannot otherwise be completed, # put data back and try again some other day unshift_write_buffer data, frame break rescue Exception triggered_close $!., :terminated raise end end end if @write_buffer.empty? && @closing triggered_close 'client disconnected' end end |
#push_write_buffer(data, frame) ⇒ Object
Adds data and frame pair to the end of the write buffer
123 124 125 126 127 |
# File 'lib/onstomp/connections/base.rb', line 123 def push_write_buffer data, frame @write_mutex.synchronize { @write_buffer << [data, frame] unless @closing } end |
#shift_write_buffer ⇒ Object
Removes the first data and frame pair from the write buffer
131 132 133 |
# File 'lib/onstomp/connections/base.rb', line 131 def shift_write_buffer @write_mutex.synchronize { @write_buffer.shift } end |
#unshift_write_buffer(data, frame) ⇒ Object
Adds the remains of data and frame pair to the head of the write buffer
137 138 139 |
# File 'lib/onstomp/connections/base.rb', line 137 def unshift_write_buffer data, frame @write_mutex.synchronize { @write_buffer.unshift [data, frame] } end |
#write_frame_nonblock(frame) ⇒ Object
Serializes the given frame and adds the data to the connections internal write buffer
115 116 117 118 |
# File 'lib/onstomp/connections/base.rb', line 115 def write_frame_nonblock frame ser = serializer.frame_to_bytes frame push_write_buffer ser, frame end |