Class: Stomper::Connection
- Inherits:
-
Object
- Object
- Stomper::Connection
- Defined in:
- lib/stomper/connection.rb
Overview
This class encapsulates a client connection to a message broker through the Stomp protocol. This class is also aliased as Stomper::Client
Constant Summary collapse
- PROTOCOL_VERSIONS =
The list of supported protocol versions
['1.0', '1.1']
- DEFAULT_CONFIG =
The default configuration for connections. These settings have been deliberately left unfrozen to allow users to change defaults for all connections in one fell swoop.
{ :versions => ['1.0', '1.1'], :heartbeats => [0, 0], :host => nil, :login => nil, :passcode => nil, :receiver_class => ::Stomper::Receivers::Threaded }
Constants included from Extensions::Heartbeat
Extensions::Heartbeat::EXTEND_BY_VERSION
Constants included from Extensions::Events
Extensions::Events::ALIASED_EVENTS
Constants included from Extensions::Common
Extensions::Common::EXTEND_BY_VERSION
Instance Attribute Summary collapse
-
#connected_frame ⇒ Stomper::Frame?
readonly
The CONNECTED frame sent by the broker during the connection handshake.
-
#heartbeating ⇒ Array<Fixnum>
readonly
The negotiated heartbeat strategy.
-
#heartbeats ⇒ Array<Fixnum>
The client-side heartbeat settings to allow for this connection.
-
#host ⇒ String
The host header value to send to the broker when connecting.
-
#last_received_at ⇒ Time?
readonly
A timestamp set to the last time a frame was received.
-
#last_transmitted_at ⇒ Time?
readonly
A timestamp set to the last time a frame was transmitted.
-
#login ⇒ String
The login header value to send to the broker when connecting.
-
#passcode ⇒ String
The passcode header value to send to the broker when connecting.
-
#receipt_manager ⇒ Stomper::ReceiptManager
readonly
The receipt manager.
-
#receiver_class ⇒ CLass
The class to use when instantiating a new receiver for the connection.
-
#ssl ⇒ {Symbol => Object}?
readonly
The SSL options to use if this connection is secure.
-
#subscription_manager ⇒ Stomper::SubscriptionManager
readonly
The subscription manager.
-
#uri ⇒ URI
readonly
The URI representation of the broker this connection is associated with.
-
#version ⇒ String?
readonly
The protocol version negotiated between the client and broker.
-
#versions ⇒ Array<String>
The protocol versions to allow for this connection.
Class Method Summary collapse
-
.connect(uri, options = {}) ⇒ Object
(also: open)
Creates a new connection and immediately connects it to the broker.
Instance Method Summary collapse
-
#close ⇒ Object
Disconnects from the broker immediately.
-
#connect(headers = {}) ⇒ Object
Establishes a connection to the broker.
-
#connected? ⇒ true, false
True if a connection with the broker has been established or is in the process of being established, false otherwise.
-
#duration_since_received ⇒ Fixnum
Duration in milliseconds since a frame has been received from the broker.
-
#duration_since_transmitted ⇒ Fixnum
Duration in milliseconds since a frame has been transmitted to the broker.
-
#initialize(uri, options = {}) ⇒ Connection
constructor
Creates a connection to a broker specified by the suppled uri.
-
#open ⇒ Object
Establishes a connection to the broker.
-
#receive ⇒ Stomper::Frame
Receives a frame from the broker.
-
#receive_nonblock ⇒ Stomper::Frame?
Receives a frame from the broker if there is data to be read from the underlying socket.
-
#running? ⇒ Boolean
Returns true if the receiver exists and is running.
-
#start(headers = {}) ⇒ self
Creates an instance of the class given by #receiver_class and starts it.
-
#stop(headers = {}) ⇒ self
Stops the instantiated receiver and calls Extensions::Common#disconnect if a connection has been established.
-
#transmit(frame) ⇒ Object
Transmits a frame to the broker.
Methods included from Extensions::Heartbeat
#alive?, #beat, #dead?, extend_by_protocol_version
Methods included from Extensions::Events
#after_receiving, #after_transmitting, #before_abort, #before_ack, #before_begin, #before_client_beat, #before_commit, #before_connect, #before_disconnect, #before_nack, #before_receiving, #before_send, #before_subscribe, #before_transmitting, #before_unsubscribe, #bind_callback, #on_abort, #on_ack, #on_begin, #on_broker_beat, #on_client_beat, #on_commit, #on_connect, #on_connected, #on_connection_closed, #on_connection_died, #on_connection_established, #on_connection_terminated, #on_disconnect, #on_error, #on_message, #on_nack, #on_receipt, #on_send, #on_subscribe, #on_unsubscribe
Methods included from Extensions::Scoping
#with_headers, #with_receipt, #with_transaction
Methods included from Extensions::Common
#abort, #ack, #begin, #commit, #disconnect, extend_by_protocol_version, #nack, #send, #subscribe, #unsubscribe
Constructor Details
#initialize(uri, options = {}) ⇒ Connection
Creates a connection to a broker specified by the suppled uri. The given uri will be resolved to a URI instance through URI.parse
. The final URI object must provide a create_socket method, or an error will be raised. Both URI::STOMP and URI::STOMP_SSL provide this method, so string URIs with a scheme of either “stomp” or “stomp+ssl” will work automatically. Most connection options can be supplied through query parameters specified in the URI or through an optional Hash
parameter. If the same option is configured in both the URI’s parameters and the options hash, the options hash takes precedence. Certain options, those pertaining to SSL settings for instance, must be configured through the options hash.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/stomper/connection.rb', line 151 def initialize(uri, ={}) @ssl = .delete(:ssl) || {} @uri = uri.is_a?(::URI) ? uri : ::URI.parse(uri) config = ::Stomper::Support.keys_to_sym(::CGI.parse(@uri.query || '')). merge(::Stomper::Support.keys_to_sym()) DEFAULT_CONFIG.each do |attr_name, def_val| if config.key? attr_name __send__ :"#{attr_name}=", config[attr_name] elsif def_val __send__ :"#{attr_name}=", def_val end end @host ||= (@uri.host||'localhost') @login ||= (@uri.user || '') @passcode ||= (@uri.password || '') @connected = false @heartbeating = [0,0] @last_transmitted_at = @last_received_at = nil @subscription_manager = ::Stomper::SubscriptionManager.new(self) @receipt_manager = ::Stomper::ReceiptManager.new(self) @connecting = false @disconnecting = false @disconnected = false @close_mutex = ::Mutex.new on_connected do |cf, con| unless @connected @version = (cf[:version].nil?||cf[:version].empty?) ? '1.0' : cf[:version] unless @versions.include?(@version) close raise ::Stomper::Errors::UnsupportedProtocolVersionError, "broker requested '#{@version}', client allows: #{@versions.inspect}" end c_x, c_y = @heartbeats s_x, s_y = (cf[:'heart-beat'] || '0,0').split(',').map do |v| vi = v.to_i vi > 0 ? vi : 0 end @heartbeating = [ (c_x == 0||s_y == 0 ? 0 : [c_x,s_y].max), (c_y == 0||s_x == 0 ? 0 : [c_y,s_x].max) ] extend_for_protocol end end before_disconnect do |df, con| @disconnecting = true end on_disconnect do |df, con| @disconnected = true close unless df[:receipt] end end |
Instance Attribute Details
#connected_frame ⇒ Stomper::Frame? (readonly)
The CONNECTED frame sent by the broker during the connection handshake.
34 35 36 |
# File 'lib/stomper/connection.rb', line 34 def connected_frame @connected_frame end |
#heartbeating ⇒ Array<Fixnum> (readonly)
The negotiated heartbeat strategy. The first element is the maximum number of milliseconds that the client can go without transmitting data or a heartbeat (a zero indicates that a client does not need to send heartbeats.) The second elemenet is the maximum number of milliseconds a server will go without transmitting data or a heartbeat (a zero indicates that the server need not send any heartbeats.)
56 57 58 |
# File 'lib/stomper/connection.rb', line 56 def heartbeating @heartbeating end |
#heartbeats ⇒ Array<Fixnum>
The client-side heartbeat settings to allow for this connection
47 48 49 |
# File 'lib/stomper/connection.rb', line 47 def heartbeats @heartbeats end |
#host ⇒ String
The host header value to send to the broker when connecting. This allows the client to inform the server which host it wishes to connect with when multiple brokers may share an IP address through virtual hosting.
66 67 68 |
# File 'lib/stomper/connection.rb', line 66 def host @host end |
#last_received_at ⇒ Time? (readonly)
A timestamp set to the last time a frame was received. Returns nil
if no frames have been received yet
89 90 91 |
# File 'lib/stomper/connection.rb', line 89 def last_received_at @last_received_at end |
#last_transmitted_at ⇒ Time? (readonly)
A timestamp set to the last time a frame was transmitted. Returns nil
if no frames have been transmitted yet
84 85 86 |
# File 'lib/stomper/connection.rb', line 84 def last_transmitted_at @last_transmitted_at end |
#login ⇒ String
The login header value to send to the broker when connecting.
70 71 72 |
# File 'lib/stomper/connection.rb', line 70 def login @login end |
#passcode ⇒ String
The passcode header value to send to the broker when connecting.
74 75 76 |
# File 'lib/stomper/connection.rb', line 74 def passcode @passcode end |
#receipt_manager ⇒ Stomper::ReceiptManager (readonly)
The receipt manager. Maintains the list of receipt IDs and the callbacks associated with them that will be invoked when any frame with a matching receipt-id
header is received.
101 102 103 |
# File 'lib/stomper/connection.rb', line 101 def receipt_manager @receipt_manager end |
#receiver_class ⇒ CLass
The class to use when instantiating a new receiver for the connection. Defaults to Receivers::Threaded
79 80 81 |
# File 'lib/stomper/connection.rb', line 79 def receiver_class @receiver_class end |
#ssl ⇒ {Symbol => Object}? (readonly)
The SSL options to use if this connection is secure
60 61 62 |
# File 'lib/stomper/connection.rb', line 60 def ssl @ssl end |
#subscription_manager ⇒ Stomper::SubscriptionManager (readonly)
The subscription manager. Maintains the list of destinations subscribed to as well as the callbacks to invoke when a MESSAGE frame is received on one of them.
95 96 97 |
# File 'lib/stomper/connection.rb', line 95 def subscription_manager @subscription_manager end |
#uri ⇒ URI (readonly)
The URI representation of the broker this connection is associated with
30 31 32 |
# File 'lib/stomper/connection.rb', line 30 def uri @uri end |
#version ⇒ String? (readonly)
The protocol version negotiated between the client and broker. Will be nil
until the connection has been established.
43 44 45 |
# File 'lib/stomper/connection.rb', line 43 def version @version end |
#versions ⇒ Array<String>
The protocol versions to allow for this connection
38 39 40 |
# File 'lib/stomper/connection.rb', line 38 def versions @versions end |
Class Method Details
.connect(uri, options = {}) ⇒ Object Also known as: open
Creates a new connection and immediately connects it to the broker.
307 308 309 310 311 |
# File 'lib/stomper/connection.rb', line 307 def connect(uri, ={}) conx = new(uri, ) conx.connect conx end |
Instance Method Details
#close ⇒ Object
Disconnects from the broker immediately. This is not a polite disconnect, meaning that no DISCONNECT frame is transmitted to the broker, the socket is shutdown and closed immediately. Calls to Extensions::Common#disconnect invoke this method internally after the DISCONNECT frame has been transmitted. This method always triggers the on_connection_closed event and if true
is passed as a parameter, on_connection_terminated will be triggered as well.
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 |
# File 'lib/stomper/connection.rb', line 373 def close @close_mutex.synchronize do if connected? begin trigger_event(:on_connection_terminated, self) unless @disconnected ensure unless @socket.closed? @socket.shutdown(2) rescue nil @socket.close rescue nil end @connecting = @connected = false end trigger_event(:on_connection_closed, self) subscription_manager.clear receipt_manager.clear end end end |
#connect(headers = {}) ⇒ Object
Establishes a connection to the broker. After the socket connection is established, a CONNECT/STOMP frame will be sent to the broker and a frame will be read from the TCP stream. If the frame is a CONNECTED frame, the connection has been established and you’re ready to go, otherwise the socket will be closed and an error will be raised.
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/stomper/connection.rb', line 274 def connect(headers={}) unless @connected @socket = @uri.create_socket(@ssl) @serializer = ::Stomper::FrameSerializer.new(@socket) m_headers = { :'accept-version' => @versions.join(','), :host => @host, :'heart-beat' => @heartbeats.join(','), :login => @login, :passcode => @passcode } @disconnecting = false @disconnected = false @connecting = true transmit create_frame('CONNECT', headers, m_headers) receive.tap do |f| if f.command == 'CONNECTED' @connected_frame = f @connected = true @connecting = false trigger_event(:on_connection_established, self) else close raise ::Stomper::Errors::ConnectFailedError, 'broker did not send CONNECTED frame' end end end end |
#connected? ⇒ true, false
True if a connection with the broker has been established or is in the process of being established, false otherwise.
318 319 320 |
# File 'lib/stomper/connection.rb', line 318 def connected? (@connecting || @connected) && !@socket.closed? end |
#duration_since_received ⇒ Fixnum
Duration in milliseconds since a frame has been received from the broker.
454 455 456 |
# File 'lib/stomper/connection.rb', line 454 def duration_since_received @last_received_at && ((Time.now - @last_received_at)*1000).to_i end |
#duration_since_transmitted ⇒ Fixnum
Duration in milliseconds since a frame has been transmitted to the broker.
448 449 450 |
# File 'lib/stomper/connection.rb', line 448 def duration_since_transmitted @last_transmitted_at && ((Time.now - @last_transmitted_at)*1000).to_i end |
#open ⇒ Object
Establishes a connection to the broker. After the socket connection is established, a CONNECT/STOMP frame will be sent to the broker and a frame will be read from the TCP stream. If the frame is a CONNECTED frame, the connection has been established and you’re ready to go, otherwise the socket will be closed and an error will be raised.
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'lib/stomper/connection.rb', line 302 def connect(headers={}) unless @connected @socket = @uri.create_socket(@ssl) @serializer = ::Stomper::FrameSerializer.new(@socket) m_headers = { :'accept-version' => @versions.join(','), :host => @host, :'heart-beat' => @heartbeats.join(','), :login => @login, :passcode => @passcode } @disconnecting = false @disconnected = false @connecting = true transmit create_frame('CONNECT', headers, m_headers) receive.tap do |f| if f.command == 'CONNECTED' @connected_frame = f @connected = true @connecting = false trigger_event(:on_connection_established, self) else close raise ::Stomper::Errors::ConnectFailedError, 'broker did not send CONNECTED frame' end end end end |
#receive ⇒ Stomper::Frame
Receives a frame from the broker.
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'lib/stomper/connection.rb', line 413 def receive if alive? trigger_event(:before_receiving, nil, self) begin @serializer.read_frame.tap do |f| if f.nil? close else @last_received_at = Time.now trigger_event(:after_receiving, f, self) trigger_received_frame(f, self) end end rescue ::IOError, ::SystemCallError close raise end else trigger_event(:on_connection_died, self) nil end end |
#receive_nonblock ⇒ Stomper::Frame?
While this method will not block if there is no data ready for reading, if any data is available it will block until a complete frame has been read.
Receives a frame from the broker if there is data to be read from the underlying socket. If there is no data available for reading from the socket, nil
is returned.
442 443 444 |
# File 'lib/stomper/connection.rb', line 442 def receive_nonblock receive if @socket.ready? end |
#running? ⇒ Boolean
Returns true if the receiver exists and is running.
355 356 357 |
# File 'lib/stomper/connection.rb', line 355 def running? @receiver && @receiver.running? end |
#start(headers = {}) ⇒ self
Creates an instance of the class given by #receiver_class and starts it. A call to #connect will be made if the connection has not been established. The class to instantiate can be overridden on a per connection basis, or for all connections by changing DEFAULT_CONFIG
331 332 333 334 335 336 |
# File 'lib/stomper/connection.rb', line 331 def start(headers={}) connect(headers) unless @connected @receiver ||= receiver_class.new(self) @receiver.start self end |
#stop(headers = {}) ⇒ self
Stops the instantiated receiver and calls Extensions::Common#disconnect if a connection has been established.
348 349 350 351 352 |
# File 'lib/stomper/connection.rb', line 348 def stop(headers={}) disconnect(headers) unless @disconnecting @receiver && @receiver.stop self end |
#transmit(frame) ⇒ Object
Transmits a frame to the broker. This is a low-level method used internally by the more user friendly interface.
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/stomper/connection.rb', line 395 def transmit(frame) trigger_event(:on_connection_died, self) if dead? trigger_event(:before_transmitting, frame, self) trigger_before_transmitted_frame(frame, self) begin @serializer.write_frame(frame).tap do @last_transmitted_at = Time.now trigger_event(:after_transmitting, frame, self) trigger_transmitted_frame(frame, self) end rescue ::IOError, ::SystemCallError close raise end end |