Class: Ably::Realtime::Connection::ConnectionManager Private
- Inherits:
-
Object
- Object
- Ably::Realtime::Connection::ConnectionManager
- Defined in:
- lib/ably/realtime/connection/connection_manager.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
ConnectionManager is responsible for all actions relating to underlying connection and transports, such as opening, closing, attempting reconnects etc. Connection state changes are performed by this class and executed from ConnectionStateMachine
This is a private class and should never be used directly by developers as the API is likely to change in future.
Constant Summary collapse
- RESOLVABLE_ERROR_CODES =
This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.
Error codes from the server that can potentially be resolved
{ token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE }
Instance Method Summary collapse
-
#close_connection ⇒ Object
private
Send a Close Models::ProtocolMessage to the server and release the transport.
-
#connected(protocol_message) ⇒ Object
private
Called whenever a new connection is made.
-
#connected_update(protocol_message) ⇒ Object
private
When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h.
-
#connection_opening_failed(error) ⇒ Object
private
Called by the transport when a connection attempt fails.
-
#destroy_transport ⇒ Object
private
Ensures the underlying transport has been disconnected and all event emitter callbacks removed.
- #detach_active_channels ⇒ Object private
-
#error_received_from_server(error) ⇒ Object
private
ProtocolMessage Error received from server.
-
#fail(error) ⇒ Object
private
Connection has failed.
- #fail_active_channels(error) ⇒ Object private
-
#force_close_connection ⇒ Object
private
Close the underlying transport immediately and set the connection state to closed.
-
#initialize(connection) ⇒ ConnectionManager
constructor
private
A new instance of ConnectionManager.
-
#nack_messages_on_all_channels(error) ⇒ Object
private
When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection.
-
#reconnect_transport ⇒ Object
private
Reconnect the WebsocketTransport if possible, otherwise set up a new transport.
- #reintialize_failed_chanels ⇒ Object private
- #release_and_establish_new_transport ⇒ Object private
-
#resend_pending_message_ack_queue ⇒ Object
private
Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received.
-
#reset_liveness_timer ⇒ Object
private
Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically.
-
#respond_to_transport_disconnected_when_connecting(error) ⇒ Object
private
When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed.
-
#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object
private
When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed.
-
#retry_count_for_state(state) ⇒ Integer
private
Number of consecutive attempts for provided state.
-
#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object
private
Creates and sets up a new WebsocketTransport available on attribute #transport.
- #suspend_active_channels(error) ⇒ Object private
Constructor Details
#initialize(connection) ⇒ ConnectionManager
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ConnectionManager.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 19 def initialize(connection) @connection = connection @timers = Hash.new { |hash, key| hash[key] = [] } # RTN8c, RTN9c connection.unsafe_on(:closing, :closed, :suspended, :failed) do connection.reset_resume_info end connection.unsafe_once(:connecting) do close_connection_when_reactor_is_stopped end EventMachine.next_tick do # Connect once Connection object is initialised connection.connect if client.auto_connect && connection.can_transition_to?(:connecting) end end |
Instance Method Details
#close_connection ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Send a Close Models::ProtocolMessage to the server and release the transport
192 193 194 195 196 197 198 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 192 def close_connection connection.(action: Ably::Models::ProtocolMessage::ACTION.Close) create_timeout_timer_whilst_in_state(:closing, realtime_request_timeout) do force_close_connection if connection.closing? end end |
#connected(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called whenever a new connection is made
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 109 def connected() # ClientID validity is already checked as part of the incoming message processing client.auth.configure_client_id .connection_details.client_id # Update the connection details and any associated defaults connection.set_connection_details .connection_details is_connection_resume_or_recover_attempt = !connection.key.nil_or_empty? || !client.recover.nil_or_empty? # RTN15c7, RTN16d failed_resume_or_recover = !.connection_id == connection.id && !.error.nil? if is_connection_resume_or_recover_attempt and failed_resume_or_recover # RTN15c7 connection.reset_client_msg_serial end client.disable_automatic_connection_recovery # RTN16k, explicitly setting null, so it won't be used for subsequent connection requests if connection.key if .connection_id == connection.id logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" } else .error end else logger.debug { "ConnectionManager: New connection created with ID #{.connection_id} and key #{.connection_details.connection_key}" } end connection.configure_new .connection_id, .connection_details.connection_key force_reattach_on_channels .error # irrespective of connection success/failure, reattach channels end |
#connected_update(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 142 def connected_update() # ClientID validity is already checked as part of the incoming message processing client.auth.configure_client_id .connection_details.client_id # Update the connection details and any associated defaults connection.set_connection_details .connection_details connection.configure_new .connection_id, .connection_details.connection_key state_change = Ably::Models::ConnectionStateChange.new( current: connection.state, previous: connection.state, event: Ably::Realtime::Connection::EVENT(:update), reason: .error, protocol_message: ) connection.emit :update, state_change end |
#connection_opening_failed(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Called by the transport when a connection attempt fails
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 86 def connection_opening_failed(error) if error.kind_of?(Ably::Exceptions::BaseAblyException) # Authentication errors that indicate the authentication failure is terminal should move to the failed state if ([401, 403].include?(error.status) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)) || (error.code == Ably::Exceptions::Codes::INVALID_CLIENT_ID) connection.transition_state_machine :failed, reason: error return end end logger.warn { "ConnectionManager: Connection to #{connection.current_host}:#{connection.port} failed; #{error.}" } next_state = get_next_retry_state_info if connection.state == next_state.fetch(:state) logger.error { "ConnectionManager: Skipping next retry state after connection opening failed as already in state #{next_state}\n#{caller[0..20].join("\n")}" } else connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: Ably::Exceptions::ConnectionError.new("Connection failed: #{error.}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED, error) end end |
#destroy_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Ensures the underlying transport has been disconnected and all event emitter callbacks removed
164 165 166 167 168 169 170 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 164 def destroy_transport if transport unsubscribe_from_transport_events transport transport.close_connection connection.release_websocket_transport end end |
#detach_active_channels ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
314 315 316 317 318 319 320 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 314 def detach_active_channels channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? end.each do |channel| channel.transition_state_machine! :detaching # will always move to detached immediately if connection is closed end end |
#error_received_from_server(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
ProtocolMessage Error received from server. Some error states can be resolved by the client library.
268 269 270 271 272 273 274 275 276 277 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 268 def error_received_from_server(error) case error.code when RESOLVABLE_ERROR_CODES.fetch(:token_expired) next_state = get_next_retry_state_info(1) connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: error else logger.error { "ConnectionManager: Error #{error.class.name} code #{error.code} received from server '#{error.}', transitioning to failed state" } connection.transition_state_machine :failed, reason: error end end |
#fail(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Connection has failed
211 212 213 214 215 216 217 218 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 211 def fail(error) connection.logger.fatal { "ConnectionManager: Connection failed - #{error}" } destroy_transport channels.each do |channel| next if channel.detached? || channel.initialized? channel.transition_state_machine :failed, reason: error if channel.can_transition_to?(:failed) end end |
#fail_active_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
323 324 325 326 327 328 329 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 323 def fail_active_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? || channel.suspended? end.each do |channel| channel.transition_state_machine! :failed, reason: error end end |
#force_close_connection ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Close the underlying transport immediately and set the connection state to closed
203 204 205 206 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 203 def force_close_connection destroy_transport connection.transition_state_machine :closed end |
#nack_messages_on_all_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK’d as we now have a new connection
342 343 344 345 346 347 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 342 def (error) channels.each do |channel| channel.manager. error, immediately: true channel.manager. error end end |
#reconnect_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Reconnect the WebsocketTransport if possible, otherwise set up a new transport
181 182 183 184 185 186 187 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 181 def reconnect_transport if !transport || transport.disconnected? setup_transport else transport.reconnect connection.current_host, connection.port end end |
#reintialize_failed_chanels ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
332 333 334 335 336 337 338 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 332 def reintialize_failed_chanels channels.select do |channel| channel.failed? end.each do |channel| channel.transition_state_machine :initialized end end |
#release_and_establish_new_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
173 174 175 176 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 173 def release_and_establish_new_transport destroy_transport setup_transport end |
#resend_pending_message_ack_queue ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received. It is up to Ably to ensure that duplicate messages are not retransmitted on the channel based on the message serial numbers
294 295 296 297 298 299 300 301 302 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 294 def connection..delete_if do || if .ack_required? connection. << connection.__outgoing_protocol_msgbus__.publish :protocol_message true end end end |
#reset_liveness_timer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically
351 352 353 354 355 356 357 358 359 360 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 351 def reset_liveness_timer @liveness_timer.cancel if @liveness_timer @liveness_timer = EventMachine::Timer.new(connection.heartbeat_interval + 0.1) do if connection.connected? && (connection.time_since_connection_confirmed_alive? >= connection.heartbeat_interval) msg = "No activity seen from realtime in #{connection.heartbeat_interval}; assuming connection has dropped"; error = Ably::Exceptions::ConnectionTimeout.new(msg, Ably::Exceptions::Codes::DISCONNECTED, 408) connection.transition_state_machine! :disconnected, reason: error end end end |
#respond_to_transport_disconnected_when_connecting(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 223 def respond_to_transport_disconnected_when_connecting(error) return unless connection.disconnected? || connection.suspended? # do nothing if state has changed through an explicit request return if currently_renewing_token? # do not always reattempt connection or change state as client may be re-authorising if error.kind_of?(Ably::Models::ErrorInfo) if RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code) next_state = get_next_retry_state_info(1) logger.debug { "ConnectionManager: Transport disconnected because of token expiry, pausing #{next_state.fetch(:pause)}s before reattempting to connect" } EventMachine.add_timer(next_state.fetch(:pause)) { renew_token_and_reconnect error } return end end if connection.state == :suspended return if connection_retry_for(:suspended) elsif connection.state == :disconnected return if connection_retry_for(:disconnected) end # Fallback if no other criteria met connection.transition_state_machine :failed, reason: error end |
#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed
249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 249 def respond_to_transport_disconnected_whilst_connected(error) unless connection.disconnected? || connection.suspended? logger.warn { "ConnectionManager: Connection #{"to #{connection.transport.url}" if connection.transport} was disconnected unexpectedly" } else logger.debug { "ConnectionManager: Transport disconnected whilst connection in #{connection.state} state" } end if error.kind_of?(Ably::Models::ErrorInfo) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code) logger.error { "ConnectionManager: Error in Disconnected ProtocolMessage received from the server - #{error}" } end destroy_transport respond_to_transport_disconnected_when_connecting error end |
#retry_count_for_state(state) ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Number of consecutive attempts for provided state
284 285 286 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 284 def retry_count_for_state(state) retries_for_state(state, ignore_states: [:connecting]).count end |
#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates and sets up a new WebsocketTransport available on attribute #transport
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 42 def setup_transport if transport && !transport.ready_for_release? raise RuntimeError, 'Existing WebsocketTransport is connected, and must be closed first' end unless client.auth.authentication_security_requirements_met? connection.transition_state_machine :failed, reason: Ably::Exceptions::InsecureRequest.new('Cannot use Basic Auth over non-TLS connections', 401, Ably::Exceptions::Codes::INVALID_USE_OF_BASIC_AUTH_OVER_NONTLS_TRANSPORT) return end logger.debug { 'ConnectionManager: Opening a websocket transport connection' } # The socket attempt can fail at the same time as a timer firing so ensure # only one outcome is processed from this setup attempt setup_attempt_status = {} setup_failed = lambda do return true if setup_attempt_status[:failed] setup_attempt_status[:failed] = true false end connection.create_websocket_transport.tap do |socket_deferrable| socket_deferrable.callback do |websocket_transport| subscribe_to_transport_events websocket_transport yield websocket_transport if block_given? end socket_deferrable.errback do |error| next if setup_failed.call connection_opening_failed error end end # The connection request timeout must be marginally higher than the REST request timeout to ensure # any HTTP auth request failure due to timeout triggers before the connection timer kicks in logger.debug { "ConnectionManager: Setting up automatic connection timeout timer for #{realtime_request_timeout}s" } create_timeout_timer_whilst_in_state(:connecting, realtime_request_timeout) do next if setup_failed.call connection_opening_failed Ably::Exceptions::ConnectionTimeout.new("Connection to Ably timed out after #{realtime_request_timeout}s", nil, Ably::Exceptions::Codes::CONNECTION_TIMED_OUT) end end |
#suspend_active_channels(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
305 306 307 308 309 310 311 |
# File 'lib/ably/realtime/connection/connection_manager.rb', line 305 def suspend_active_channels(error) channels.select do |channel| channel.attached? || channel.attaching? || channel.detaching? end.each do |channel| channel.transition_state_machine! :suspended, reason: error end end |