Class: Ably::Realtime::Connection::ConnectionManager Private
- Inherits:
-
Object
- Object
- Ably::Realtime::Connection::ConnectionManager
- Defined in:
- lib/ably/realtime/connection/connection_manager.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
ConnectionManager is responsible for all actions relating to underlying connection and transports, such as opening, closing, attempting reconnects etc. Connection state changes are performed by this class and executed from ConnectionStateMachine
This is a private class and should never be used directly by developers as the API is likely to change in future.
Constant Summary collapse
- RESOLVABLE_ERROR_CODES =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Error codes from the server that can potentially be resolved
{ token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE }
Instance Method Summary collapse
-
#close_connection ⇒ Object
private
Send a Close Models::ProtocolMessage to the server and release the transport.
-
#connected(protocol_message) ⇒ Object
private
Called whenever a new connection is made.
-
#connected_update(protocol_message) ⇒ Object
private
When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h.
-
#connection_opening_failed(error) ⇒ Object
private
Called by the transport when a connection attempt fails.
-
#destroy_transport ⇒ Object
private
Ensures the underlying transport has been disconnected and all event emitter callbacks removed.
- #detach_active_channels ⇒ Object private
-
#error_received_from_server(error) ⇒ Object
private
ProtocolMessage Error received from server.
-
#fail(error) ⇒ Object
private
Connection has failed.
- #fail_active_channels(error) ⇒ Object private
-
#force_close_connection ⇒ Object
private
Close the underlying transport immediately and set the connection state to closed.
-
#initialize(connection) ⇒ ConnectionManager
constructor
private
A new instance of ConnectionManager.
-
#nack_messages_on_all_channels(error) ⇒ Object
private
When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection.
-
#reconnect_transport ⇒ Object
private
Reconnect the WebsocketTransport if possible, otherwise set up a new transport.
- #release_and_establish_new_transport ⇒ Object private
-
#resend_pending_message_ack_queue ⇒ Object
private
Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received.
-
#reset_liveness_timer ⇒ Object
private
Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically.
-
#respond_to_transport_disconnected_when_connecting(error) ⇒ Object
private
When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed.
-
#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object
private
When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed.
-
#retry_count_for_state(state) ⇒ Integer
private
Number of consecutive attempts for provided state.
-
#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object
private
Creates and sets up a new WebsocketTransport available on attribute #transport.
- #suspend_active_channels(error) ⇒ Object private
Constructor Details
#initialize(connection) ⇒ ConnectionManager
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ConnectionManager.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 18 def initialize(connection) @connection = connection @timers = Hash.new { |hash, key| hash[key] = [] } connection.unsafe_on(:closed) do connection.reset_resume_info end connection.unsafe_once(:connecting) do close_connection_when_reactor_is_stopped end EventMachine.next_tick do # Connect once Connection object is initialised connection.connect if client.auto_connect && connection.can_transition_to?(:connecting) end end |
Instance Method Details
#close_connection ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Send a Close Models::ProtocolMessage to the server and release the transport
169 170 171 172 173 174 175 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 169 def close_connection connection.(action: Ably::Models::ProtocolMessage::ACTION.Close) create_timeout_timer_whilst_in_state(:closing, realtime_request_timeout) do force_close_connection if connection.closing? end end |
#connected(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called whenever a new connection is made
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 89 def connected() # ClientID validity is already checked as part of the incoming message processing client.auth.configure_client_id .connection_details.client_id # Update the connection details and any associated defaults connection.set_connection_details .connection_details if connection.key if .connection_id == connection.id logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" } EventMachine.next_tick { connection.trigger_resumed } else logger.debug { "ConnectionManager: Connection was not resumed, old connection ID #{connection.id} has been updated with new connection ID #{.connection_id} and key #{.connection_key}" } connection.reset_client_serial .error force_reattach_on_channels .error end else logger.debug { "ConnectionManager: New connection created with ID #{.connection_id} and key #{.connection_key}" } connection.reset_client_serial end reattach_suspended_channels .error connection.configure_new .connection_id, .connection_key, .connection_serial end |
#connected_update(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 119 def connected_update() # ClientID validity is already checked as part of the incoming message processing client.auth.configure_client_id .connection_details.client_id # Update the connection details and any associated defaults connection.set_connection_details .connection_details connection.configure_new .connection_id, .connection_key, .connection_serial state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, previous: connection.state, event: Ably::Realtime::Connection::EVENT(:update), reason: .error, protocol_message: ) connection.emit :update, state_change end |
#connection_opening_failed(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by the transport when a connection attempt fails
71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 71 def connection_opening_failed(error) if error.kind_of?(Ably::Exceptions::BaseAblyException) # Authentication errors that indicate the authentication failure is terminal should move to the failed state if ([401, 403].include?(error.status) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)) || (error.code == Ably::Exceptions::INVALID_CLIENT_ID) connection.transition_state_machine :failed, reason: error return end end logger.warn { "ConnectionManager: Connection to #{connection.current_host}:#{connection.port} failed; #{error.}" } next_state = get_next_retry_state_info connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: Ably::Exceptions::ConnectionError.new("Connection failed: #{error.}", nil, 80000, error) end |
#destroy_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Ensures the underlying transport has been disconnected and all event emitter callbacks removed
141 142 143 144 145 146 147 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 141 def destroy_transport if transport unsubscribe_from_transport_events transport transport.close_connection connection.release_websocket_transport end end |
#detach_active_channels ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
289 290 291 292 293 294 295 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 289 def detach_active_channels channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? end.each do |channel| channel.transition_state_machine! :detaching # will always move to detached immediately if connection is closed end end |
#error_received_from_server(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
ProtocolMessage Error received from server. Some error states can be resolved by the client library.
245 246 247 248 249 250 251 252 253 254 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 245 def error_received_from_server(error) case error.code when RESOLVABLE_ERROR_CODES.fetch(:token_expired) next_state = get_next_retry_state_info(1) connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: error else logger.error { "ConnectionManager: Error #{error.class.name} code #{error.code} received from server '#{error.}', transitioning to failed state" } connection.transition_state_machine :failed, reason: error end end |
#fail(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Connection has failed
188 189 190 191 192 193 194 195 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 188 def fail(error) connection.logger.fatal { "ConnectionManager: Connection failed - #{error}" } destroy_transport channels.each do |channel| next if channel.detached? || channel.initialized? channel.transition_state_machine :failed, reason: error if channel.can_transition_to?(:failed) end end |
#fail_active_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
298 299 300 301 302 303 304 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 298 def fail_active_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? || channel.suspended? end.each do |channel| channel.transition_state_machine! :failed, reason: error end end |
#force_close_connection ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Close the underlying transport immediately and set the connection state to closed
180 181 182 183 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 180 def force_close_connection destroy_transport connection.transition_state_machine :closed end |
#nack_messages_on_all_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection
308 309 310 311 312 313 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 308 def (error) channels.each do |channel| channel.manager. error, immediately: true channel.manager. error end end |
#reconnect_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Reconnect the WebsocketTransport if possible, otherwise set up a new transport
158 159 160 161 162 163 164 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 158 def reconnect_transport if !transport || transport.disconnected? setup_transport else transport.reconnect connection.current_host, connection.port end end |
#release_and_establish_new_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
150 151 152 153 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 150 def release_and_establish_new_transport destroy_transport setup_transport end |
#resend_pending_message_ack_queue ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received. It is up to Ably to ensure that duplicate messages are not retransmitted on the channel base on the serial numbers
269 270 271 272 273 274 275 276 277 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 269 def connection..delete_if do || if .ack_required? connection. << connection.__outgoing_protocol_msgbus__.publish :protocol_message true end end end |
#reset_liveness_timer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically
317 318 319 320 321 322 323 324 325 326 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 317 def reset_liveness_timer @liveness_timer.cancel if @liveness_timer @liveness_timer = EventMachine::Timer.new(connection.heartbeat_interval + 0.1) do if connection.connected? && (connection.time_since_connection_confirmed_alive? >= connection.heartbeat_interval) msg = "No activity seen from realtime in #{connection.heartbeat_interval}; assuming connection has dropped"; error = Ably::Exceptions::ConnectionTimeout.new(msg, 80003, 408) connection.transition_state_machine! :disconnected, reason: error end end end |
#respond_to_transport_disconnected_when_connecting(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 200 def respond_to_transport_disconnected_when_connecting(error) return unless connection.disconnected? || connection.suspended? # do nothing if state has changed through an explicit request return if currently_renewing_token? # do not always reattempt connection or change state as client may be re-authorising if error.kind_of?(Ably::Models::ErrorInfo) if RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code) next_state = get_next_retry_state_info(1) logger.debug { "ConnectionManager: Transport disconnected because of token expiry, pausing #{next_state.fetch(:pause)}s before reattempting to connect" } EventMachine.add_timer(next_state.fetch(:pause)) { renew_token_and_reconnect error } return end end if connection.state == :suspended return if connection_retry_for(:suspended) elsif connection.state == :disconnected return if connection_retry_for(:disconnected) end # Fallback if no other criteria met connection.transition_state_machine :failed, reason: error end |
#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed
226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 226 def respond_to_transport_disconnected_whilst_connected(error) unless connection.disconnected? || connection.suspended? logger.warn { "ConnectionManager: Connection #{"to #{connection.transport.url}" if connection.transport} was disconnected unexpectedly" } else logger.debug { "ConnectionManager: Transport disconnected whilst connection in #{connection.state} state" } end if error.kind_of?(Ably::Models::ErrorInfo) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code) logger.error { "ConnectionManager: Error in Disconnected ProtocolMessage received from the server - #{error}" } end destroy_transport respond_to_transport_disconnected_when_connecting error end |
#retry_count_for_state(state) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Number of consecutive attempts for provided state
259 260 261 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 259 def retry_count_for_state(state) retries_for_state(state, ignore_states: [:connecting]).count end |
#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates and sets up a new WebsocketTransport available on attribute #transport
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 40 def setup_transport if transport && !transport.ready_for_release? raise RuntimeError, 'Existing WebsocketTransport is connected, and must be closed first' end unless client.auth.authentication_security_requirements_met? connection.transition_state_machine :failed, reason: Ably::Exceptions::InsecureRequest.new('Cannot use Basic Auth over non-TLS connections', 401, 40103) return end logger.debug { 'ConnectionManager: Opening a websocket transport connection' } connection.create_websocket_transport.tap do |socket_deferrable| socket_deferrable.callback do |websocket_transport| subscribe_to_transport_events websocket_transport yield websocket_transport if block_given? end socket_deferrable.errback do |error| connection_opening_failed error end end logger.debug { "ConnectionManager: Setting up automatic connection timeout timer for #{realtime_request_timeout}s" } create_timeout_timer_whilst_in_state(:connecting, realtime_request_timeout) do connection_opening_failed Ably::Exceptions::ConnectionTimeout.new("Connection to Ably timed out after #{realtime_request_timeout}s", nil, 80014) end end |
#suspend_active_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
280 281 282 283 284 285 286 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 280 def suspend_active_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? end.each do |channel| channel.transition_state_machine! :suspended, reason: error end end |