Class: Ably::Realtime::Connection
- Inherits:
-
Object
- Object
- Ably::Realtime::Connection
- Extended by:
- Modules::Enum
- Includes:
- Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
- Defined in:
- lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb
Overview
The Connection class represents the connection associated with an Ably Realtime instance. The Connection object exposes the lifecycle and parameters of the realtime connection.
Connections will always be in one of the following states:
initialized: 0
connecting: 1
connected: 2
disconnected: 3
suspended: 4
closing: 5
closed: 6
failed: 7
Note that the states are available as Enum-like constants:
Connection::STATE.Initialized
Connection::STATE.Connecting
Connection::STATE.Connected
Connection::STATE.Disconnected
Connection::STATE.Suspended
Connection::STATE.Closing
Connection::STATE.Closed
Connection::STATE.Failed
Connection emit errors - use ‘on(:error)` to subscribe to errors
Defined Under Namespace
Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport
Constant Summary collapse
- STATE =
Valid Connection states
ruby_enum('STATE', :initialized, :connecting, :connected, :disconnected, :suspended, :closing, :closed, :failed )
- RECOVER_REGEX =
Expected format for a connection recover key
/^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/
- DEFAULTS =
Defaults for automatic connection recovery and timeouts
{ disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically connection_state_ttl: 60, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected realtime_request_timeout: 10 # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage }.freeze
Instance Attribute Summary collapse
-
#__incoming_protocol_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal incoming protocol message bus.
-
#__outgoing_message_queue__ ⇒ Array
readonly
private
An internal queue used to manage unsent outgoing messages.
-
#__outgoing_protocol_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal outgoing protocol message bus.
-
#__pending_message_ack_queue__ ⇒ Array
readonly
private
An internal queue used to manage sent messages.
-
#client ⇒ Ably::Realtime::Client
readonly
Client associated with this connection.
-
#current_host ⇒ String
readonly
private
The current host that is configured following a call to method #determine_host.
-
#defaults ⇒ Hash
readonly
Configured recovery and timeout defaults for this Connection.
-
#details ⇒ Ably::Models::ConnectionDetails
readonly
Connection details of the currently established connection.
-
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException
readonly
When a connection failure occurs this attribute contains the Ably Exception.
-
#id ⇒ String
readonly
A unique public identifier for this connection, used to identify this member in presence events and messages.
-
#key ⇒ String
readonly
A unique private connection key used to recover this connection, assigned by Ably.
-
#logger ⇒ Logger
readonly
The Logger for this client.
-
#manager ⇒ Ably::Realtime::Connection::ConnectionManager
readonly
private
The Connection manager responsible for creating, maintaining and closing the connection and underlying transport.
-
#port ⇒ Integer
readonly
The default port used for this connection.
-
#recovery_key ⇒ String
readonly
Recovery key that can be used by another client to recover this connection with the :recover option.
-
#serial ⇒ Integer
readonly
The serial number of the last message to be received on this connection, used to recover or resume a connection.
-
#state ⇒ Ably::Realtime::Connection::STATE
readonly
Connection state.
-
#transport ⇒ Ably::Realtime::Connection::WebsocketTransport
readonly
private
Underlying socket transport used for this connection, for internal use by the client library.
Attributes included from Modules::UsesStateMachine
#previous_state, #state_history
Instance Method Summary collapse
- #add_message_to_outgoing_queue(protocol_message) ⇒ Object private
-
#can_publish_messages? ⇒ Boolean
private
Returns false if messages cannot be published as a result of message queueing being disabled.
- #clear_error_reason ⇒ Object private
-
#close { ... } ⇒ EventMachine::Deferrable
Causes the connection to close, entering the closed state, from any state except the failed state.
-
#configure_new(connection_id, connection_key, connection_serial) ⇒ void
private
Following a new connection being made, the connection ID, connection key and message serial need to match the details provided by the server.
-
#connect { ... } ⇒ EventMachine::Deferrable
Causes the library to attempt connection.
- #create_websocket_transport ⇒ EventMachine::Deferrable private
-
#determine_host {|String| ... } ⇒ Object
private
Determines the correct host name to use for the next connection attempt and updates current_host.
-
#initialize(client, options) ⇒ Connection
constructor
A new instance of Connection.
- #internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable private
- #notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object private
-
#off_resume(&callback) ⇒ Object
private
Remove a registered connection resume callback.
-
#on_resume(&callback) ⇒ Object
private
Provides a simple hook to inject a callback when a connection is successfully resumed.
-
#ping {|Integer| ... } ⇒ void
Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server.
- #release_websocket_transport ⇒ Object private
-
#reset_resume_info ⇒ void
private
Disable automatic resume of a connection.
-
#resumed ⇒ Object
private
Executes registered callbacks for a successful connection resume event.
-
#send_protocol_message(protocol_message) ⇒ void
private
Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent.
- #set_connection_details(connection_details) ⇒ Object private
- #set_failed_connection_error_reason(error) ⇒ Object private
-
#update_connection_serial(connection_serial) ⇒ void
private
Store last received connection serial so that the connection can be resumed from the last known point-in-time.
Methods included from Modules::UsesStateMachine
#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!
Methods included from Modules::StateEmitter
#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed
Methods included from Modules::EventEmitter
#emit, #off, #on, #once, #unsafe_on, #unsafe_once
Constructor Details
#initialize(client, options) ⇒ Connection
Returns a new instance of Connection.
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/ably/realtime/connection.rb', line 122 def initialize(client, ) @client = client @client_serial = -1 @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] @defaults = DEFAULTS.dup .each do |key, val| @defaults[key] = val if DEFAULTS.has_key?(key) end if .kind_of?(Hash) @defaults.freeze Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ConnectionManager.new(self) end |
Instance Attribute Details
#__incoming_protocol_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal incoming protocol message bus.
313 314 315 |
# File 'lib/ably/realtime/connection.rb', line 313 def __incoming_protocol_msgbus__ @__incoming_protocol_msgbus__ ||= end |
#__outgoing_message_queue__ ⇒ Array (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
108 109 110 |
# File 'lib/ably/realtime/connection.rb', line 108 def @__outgoing_message_queue__ end |
#__outgoing_protocol_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal outgoing protocol message bus.
306 307 308 |
# File 'lib/ably/realtime/connection.rb', line 306 def __outgoing_protocol_msgbus__ @__outgoing_protocol_msgbus__ ||= end |
#__pending_message_ack_queue__ ⇒ Array (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An internal queue used to manage sent messages. You should never interface with this array directly
113 114 115 |
# File 'lib/ably/realtime/connection.rb', line 113 def @__pending_message_ack_queue__ end |
#client ⇒ Ably::Realtime::Client (readonly)
Ably::Realtime::Client associated with this connection
93 94 95 |
# File 'lib/ably/realtime/connection.rb', line 93 def client @client end |
#current_host ⇒ String (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns The current host that is configured following a call to method #determine_host.
340 341 342 |
# File 'lib/ably/realtime/connection.rb', line 340 def current_host @current_host end |
#defaults ⇒ Hash (readonly)
Configured recovery and timeout defaults for this Ably::Realtime::Connection. See the configurable options in Ably::Realtime::Client#initialize. The defaults are immutable
119 120 121 |
# File 'lib/ably/realtime/connection.rb', line 119 def defaults @defaults end |
#details ⇒ Ably::Models::ConnectionDetails (readonly)
Connection details of the currently established connection
89 90 91 |
# File 'lib/ably/realtime/connection.rb', line 89 def details @details end |
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)
When a connection failure occurs this attribute contains the Ably Exception
85 86 87 |
# File 'lib/ably/realtime/connection.rb', line 85 def error_reason @error_reason end |
#id ⇒ String (readonly)
A unique public identifier for this connection, used to identify this member in presence events and messages
73 74 75 |
# File 'lib/ably/realtime/connection.rb', line 73 def id @id end |
#key ⇒ String (readonly)
A unique private connection key used to recover this connection, assigned by Ably
77 78 79 |
# File 'lib/ably/realtime/connection.rb', line 77 def key @key end |
#logger ⇒ Logger (readonly)
Returns The Logger for this client. Configure the log_level with the ‘:log_level` option, refer to Ably::Realtime::Client#initialize.
351 352 353 |
# File 'lib/ably/realtime/connection.rb', line 351 def logger client.logger end |
#manager ⇒ Ably::Realtime::Connection::ConnectionManager (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
103 104 105 |
# File 'lib/ably/realtime/connection.rb', line 103 def manager @manager end |
#port ⇒ Integer (readonly)
Returns The default port used for this connection.
344 345 346 |
# File 'lib/ably/realtime/connection.rb', line 344 def port client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80 end |
#recovery_key ⇒ String (readonly)
Returns recovery key that can be used by another client to recover this connection with the :recover option.
271 272 273 |
# File 'lib/ably/realtime/connection.rb', line 271 def recovery_key "#{key}:#{serial}" if connection_resumable? end |
#serial ⇒ Integer (readonly)
The serial number of the last message to be received on this connection, used to recover or resume a connection
81 82 83 |
# File 'lib/ably/realtime/connection.rb', line 81 def serial @serial end |
#state ⇒ Ably::Realtime::Connection::STATE (readonly)
Returns connection state.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 |
# File 'lib/ably/realtime/connection.rb', line 39 class Connection include Ably::Modules::EventEmitter include Ably::Modules::Conversions include Ably::Modules::SafeYield extend Ably::Modules::Enum # Valid Connection states STATE = ruby_enum('STATE', :initialized, :connecting, :connected, :disconnected, :suspended, :closing, :closed, :failed ) include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ConnectionStateChange' # Expected format for a connection recover key RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/ # Defaults for automatic connection recovery and timeouts DEFAULTS = { disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically suspended_retry_timeout: 30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically connection_state_ttl: 60, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected realtime_request_timeout: 10 # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage }.freeze # A unique public identifier for this connection, used to identify this member in presence events and messages # @return [String] attr_reader :id # A unique private connection key used to recover this connection, assigned by Ably # @return [String] attr_reader :key # The serial number of the last message to be received on this connection, used to recover or resume a connection # @return [Integer] attr_reader :serial # When a connection failure occurs this attribute contains the Ably Exception # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException] attr_reader :error_reason # Connection details of the currently established connection # @return [Ably::Models::ConnectionDetails] attr_reader :details # {Ably::Realtime::Client} associated with this connection # @return [Ably::Realtime::Client] attr_reader :client # Underlying socket transport used for this connection, for internal use by the client library # @return [Ably::Realtime::Connection::WebsocketTransport] # @api private attr_reader :transport # The Connection manager responsible for creating, maintaining and closing the connection and underlying transport # @return [Ably::Realtime::Connection::ConnectionManager] # @api private attr_reader :manager # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly # @return [Array] # @api private attr_reader :__outgoing_message_queue__ # An internal queue used to manage sent messages. You should never interface with this array directly # @return [Array] # @api private attr_reader :__pending_message_ack_queue__ # Configured recovery and timeout defaults for this {Connection}. # See the configurable options in {Ably::Realtime::Client#initialize}. # The defaults are immutable # @return [Hash] attr_reader :defaults # @api public def initialize(client, ) @client = client @client_serial = -1 @__outgoing_message_queue__ = [] @__pending_message_ack_queue__ = [] @defaults = DEFAULTS.dup .each do |key, val| @defaults[key] = val if DEFAULTS.has_key?(key) end if .kind_of?(Hash) @defaults.freeze Client::IncomingMessageDispatcher.new client, self Client::OutgoingMessageDispatcher.new client, self @state_machine = ConnectionStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ConnectionManager.new(self) end # Causes the connection to close, entering the closed state, from any state except # the failed state. Once closed, the library will not attempt to re-establish the # connection without a call to {Connection#connect}. # # @yield block is called as soon as this connection is in the Closed state # # @return [EventMachine::Deferrable] # def close(&success_block) unless closing? || closed? raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing) transition_state_machine :closing end deferrable_for_state_change_to(STATE.Closed, &success_block) end # Causes the library to attempt connection. If it was previously explicitly # closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. # Succeeds when connection is established i.e. state is @Connected@ # Fails when state becomes either @Closing@, @Closed@ or @Failed@ # # Note that if the connection remains in the disconnected ans suspended states indefinitely, # the Deferrable or block provided may never be called # # @yield block is called as soon as this connection is in the Connected state # # @return [EventMachine::Deferrable] # def connect(&success_block) unless connecting? || connected? raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting) transition_state_machine :connecting end Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| deferrable.callback do yield if block_given? end succeed_callback = deferrable.method(:succeed) fail_callback = deferrable.method(:fail) once(:connected) do deferrable.succeed off(&fail_callback) end once(:failed, :closed, :closing) do deferrable.fail off(&succeed_callback) end end end # Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. # This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. # The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state. # # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds. # If the ping is not received within an acceptable timeframe, the block will be called with +nil+ as he first argument # # @example # client = Ably::Rest::Client.new(key: 'key.id:secret') # client.connection.ping do |ms_elapsed| # puts "Ping took #{ms_elapsed}ms" # end # # @return [void] # def ping(&block) raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? started = nil finished = false wait_for_ping = Proc.new do || next if finished if .action == Ably::Models::ProtocolMessage::ACTION.Heartbeat finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do next if finished started = Time.now action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping end EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do next if finished finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s" safe_yield block, nil if block_given? end end # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN # @return [EventMachine::Deferrable] # @api private def internet_up? url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}" EventMachine::DefaultDeferrable.new.tap do |deferrable| EventMachine::HttpRequest.new(url).get.tap do |http| http.errback do yield false if block_given? deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, 80000) end http.callback do EventMachine.next_tick do result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) yield result if block_given? if result deferrable.succeed else deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, 40000) end end end end end end # @!attribute [r] recovery_key # @return [String] recovery key that can be used by another client to recover this connection with the :recover option def recovery_key "#{key}:#{serial}" if connection_resumable? end # Following a new connection being made, the connection ID, connection key # and message serial need to match the details provided by the server. # # @return [void] # @api private def configure_new(connection_id, connection_key, connection_serial) @id = connection_id @key = connection_key @client_serial = connection_serial update_connection_serial connection_serial end # Store last received connection serial so that the connection can be resumed from the last known point-in-time # @return [void] # @api private def update_connection_serial(connection_serial) @serial = connection_serial end # Disable automatic resume of a connection # @return [void] # @api private def reset_resume_info @key = nil @serial = nil end # @!attribute [r] __outgoing_protocol_msgbus__ # @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus # @api private def __outgoing_protocol_msgbus__ @__outgoing_protocol_msgbus__ ||= end # @!attribute [r] __incoming_protocol_msgbus__ # @return [Ably::Util::PubSub] Client library internal incoming protocol message bus # @api private def __incoming_protocol_msgbus__ @__incoming_protocol_msgbus__ ||= end # Determines the correct host name to use for the next connection attempt and updates current_host # @yield [String] The host name used for this connection, for network connection failures a {Ably::FALLBACK_HOSTS fallback host} is used to route around networking or intermittent problems if an Internet connection is available # @api private def determine_host raise ArgumentError, 'Block required' unless block_given? if can_use_fallback_hosts? internet_up? do |internet_is_up_result| @current_host = if internet_is_up_result client.fallback_endpoint.host else client.endpoint.host end yield current_host end else @current_host = client.endpoint.host yield current_host end end # @return [String] The current host that is configured following a call to method {#determine_host} # @api private attr_reader :current_host # @!attribute [r] port # @return [Integer] The default port used for this connection def port client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80 end # @!attribute [r] logger # @return [Logger] The {Ably::Logger} for this client. # Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize} def logger client.logger end # Add protocol message to the outgoing message queue and notify the dispatcher that a message is # ready to be sent # # @param [Ably::Models::ProtocolMessage] protocol_message # @return [void] # @api private def () () do Ably::Models::ProtocolMessage.new(, logger: logger).tap do || logger.debug("Connection: Prot msg queued =>: #{.action} #{}") end end end # @api private def () << end # @api private def () __outgoing_protocol_msgbus__.publish :protocol_message, end # @return [EventMachine::Deferrable] # @api private def create_websocket_transport EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable| # Getting auth params can be blocking so uses a Deferrable client.auth.auth_params.tap do |auth_deferrable| auth_deferrable.callback do |auth_params| url_params = auth_params.merge( format: client.protocol, echo: client. ) url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? if connection_resumable? url_params.merge! resume: key, connection_serial: serial logger.debug "Resuming connection key #{key} with serial #{serial}" elsif connection_recoverable? url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] logger.debug "Recovering connection with key #{client.recover}" once(:connected, :closed, :failed) do client.disable_automatic_connection_recovery end end url = URI(client.endpoint).tap do |endpoint| endpoint.query = URI.encode_www_form(url_params) end.to_s determine_host do |host| begin logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'" @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport| websocket_deferrable.succeed websocket_transport end rescue EventMachine::ConnectionError => error websocket_deferrable.fail error end end end auth_deferrable.errback do |error| websocket_deferrable.fail error end end end end # @api private def release_websocket_transport @transport = nil end # @api private def set_failed_connection_error_reason(error) @error_reason = error end # @api private def clear_error_reason @error_reason = nil end # @api private def set_connection_details(connection_details) @details = connection_details end # Executes registered callbacks for a successful connection resume event # @api private def resumed resume_callbacks.each(&:call) end # Provides a simple hook to inject a callback when a connection is successfully resumed # @api private def on_resume(&callback) resume_callbacks << callback end # Remove a registered connection resume callback # @api private def off_resume(&callback) resume_callbacks.delete(callback) end # Returns false if messages cannot be published as a result of message queueing being disabled # @api private def connected? || ( (initialized? || connecting? || disconnected?) && client. ) end # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private # The client serial is incremented for every message that is published that requires an ACK. # Note that this is different to the connection serial that contains the last known serial number # received from the server. # # A client serial number therefore does not guarantee a message has been received, only sent. # A connection serial guarantees the server has received the message and is thus used for connection # recovery and resumes. # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent def client_serial @client_serial end def resume_callbacks @resume_callbacks ||= [] end def Ably::Util::PubSub.new( coerce_into: Proc.new do |event| raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message :protocol_message end ) end def () if Ably::Models::ProtocolMessage.ack_required?([:action]) () { yield } else yield end end def () @client_serial += 1 [:msgSerial] = client_serial yield rescue StandardError => e @client_serial -= 1 raise e end # Simply wait until the next EventMachine tick to ensure Connection initialization is complete def when_initialized EventMachine.next_tick { yield } end def connection_resumable? !key.nil? && !serial.nil? end def connection_recoverable? connection_recover_parts end def connection_recover_parts client.recover.to_s.match(RECOVER_REGEX) end def production? client.environment.nil? || client.environment == :production end def custom_port? if client.use_tls? !!client.custom_tls_port else !!client.custom_port end end def custom_host? !!client.custom_realtime_host end def can_use_fallback_hosts? if production? && !custom_port? && !custom_host? if connecting? && previous_state use_fallback_if_disconnected? || use_fallback_if_suspended? end end end def use_fallback_if_disconnected? second_reconnect_attempt_for(:disconnected, 1) end def use_fallback_if_suspended? second_reconnect_attempt_for(:suspended, 2) # on first suspended state use default Ably host again end def second_reconnect_attempt_for(state, first_attempt_count) previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count end end |
#transport ⇒ Ably::Realtime::Connection::WebsocketTransport (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Underlying socket transport used for this connection, for internal use by the client library
98 99 100 |
# File 'lib/ably/realtime/connection.rb', line 98 def transport @transport end |
Instance Method Details
#add_message_to_outgoing_queue(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
372 373 374 |
# File 'lib/ably/realtime/connection.rb', line 372 def () << end |
#can_publish_messages? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns false if messages cannot be published as a result of message queueing being disabled
469 470 471 472 |
# File 'lib/ably/realtime/connection.rb', line 469 def connected? || ( (initialized? || connecting? || disconnected?) && client. ) end |
#clear_error_reason ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
440 441 442 |
# File 'lib/ably/realtime/connection.rb', line 440 def clear_error_reason @error_reason = nil end |
#close { ... } ⇒ EventMachine::Deferrable
Causes the connection to close, entering the closed state, from any state except the failed state. Once closed, the library will not attempt to re-establish the connection without a call to #connect.
150 151 152 153 154 155 156 |
# File 'lib/ably/realtime/connection.rb', line 150 def close(&success_block) unless closing? || closed? raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing) transition_state_machine :closing end deferrable_for_state_change_to(STATE.Closed, &success_block) end |
#configure_new(connection_id, connection_key, connection_serial) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Following a new connection being made, the connection ID, connection key and message serial need to match the details provided by the server.
280 281 282 283 284 285 286 |
# File 'lib/ably/realtime/connection.rb', line 280 def configure_new(connection_id, connection_key, connection_serial) @id = connection_id @key = connection_key @client_serial = connection_serial update_connection_serial connection_serial end |
#connect { ... } ⇒ EventMachine::Deferrable
Causes the library to attempt connection. If it was previously explicitly closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. Succeeds when connection is established i.e. state is @Connected@ Fails when state becomes either @Closing@, @Closed@ or @Failed@
Note that if the connection remains in the disconnected ans suspended states indefinitely, the Deferrable or block provided may never be called
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/ably/realtime/connection.rb', line 170 def connect(&success_block) unless connecting? || connected? raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting) transition_state_machine :connecting end Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| deferrable.callback do yield if block_given? end succeed_callback = deferrable.method(:succeed) fail_callback = deferrable.method(:fail) once(:connected) do deferrable.succeed off(&fail_callback) end once(:failed, :closed, :closing) do deferrable.fail off(&succeed_callback) end end end |
#create_websocket_transport ⇒ EventMachine::Deferrable
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/ably/realtime/connection.rb', line 383 def create_websocket_transport EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable| # Getting auth params can be blocking so uses a Deferrable client.auth.auth_params.tap do |auth_deferrable| auth_deferrable.callback do |auth_params| url_params = auth_params.merge( format: client.protocol, echo: client. ) url_params['clientId'] = client.auth.client_id if client.auth.has_client_id? if connection_resumable? url_params.merge! resume: key, connection_serial: serial logger.debug "Resuming connection key #{key} with serial #{serial}" elsif connection_recoverable? url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial] logger.debug "Recovering connection with key #{client.recover}" once(:connected, :closed, :failed) do client.disable_automatic_connection_recovery end end url = URI(client.endpoint).tap do |endpoint| endpoint.query = URI.encode_www_form(url_params) end.to_s determine_host do |host| begin logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'" @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport| websocket_deferrable.succeed websocket_transport end rescue EventMachine::ConnectionError => error websocket_deferrable.fail error end end end auth_deferrable.errback do |error| websocket_deferrable.fail error end end end end |
#determine_host {|String| ... } ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Determines the correct host name to use for the next connection attempt and updates current_host
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/ably/realtime/connection.rb', line 320 def determine_host raise ArgumentError, 'Block required' unless block_given? if can_use_fallback_hosts? internet_up? do |internet_is_up_result| @current_host = if internet_is_up_result client.fallback_endpoint.host else client.endpoint.host end yield current_host end else @current_host = client.endpoint.host yield current_host end end |
#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/ably/realtime/connection.rb', line 246 def internet_up? url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}" EventMachine::DefaultDeferrable.new.tap do |deferrable| EventMachine::HttpRequest.new(url).get.tap do |http| http.errback do yield false if block_given? deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, 80000) end http.callback do EventMachine.next_tick do result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text) yield result if block_given? if result deferrable.succeed else deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, 40000) end end end end end end |
#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
377 378 379 |
# File 'lib/ably/realtime/connection.rb', line 377 def () __outgoing_protocol_msgbus__.publish :protocol_message, end |
#off_resume(&callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Remove a registered connection resume callback
463 464 465 |
# File 'lib/ably/realtime/connection.rb', line 463 def off_resume(&callback) resume_callbacks.delete(callback) end |
#on_resume(&callback) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Provides a simple hook to inject a callback when a connection is successfully resumed
457 458 459 |
# File 'lib/ably/realtime/connection.rb', line 457 def on_resume(&callback) resume_callbacks << callback end |
#ping {|Integer| ... } ⇒ void
This method returns an undefined value.
Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the ‘:connected` state.
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/ably/realtime/connection.rb', line 210 def ping(&block) raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized? raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed? started = nil finished = false wait_for_ping = Proc.new do || next if finished if .action == Ably::Models::ProtocolMessage::ACTION.Heartbeat finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i safe_yield block, time_passed if block_given? end end once_or_if(STATE.Connected) do next if finished started = Time.now action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping end EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do next if finished finished = true __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping) logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s" safe_yield block, nil if block_given? end end |
#release_websocket_transport ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
430 431 432 |
# File 'lib/ably/realtime/connection.rb', line 430 def release_websocket_transport @transport = nil end |
#reset_resume_info ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Disable automatic resume of a connection
298 299 300 301 |
# File 'lib/ably/realtime/connection.rb', line 298 def reset_resume_info @key = nil @serial = nil end |
#resumed ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes registered callbacks for a successful connection resume event
451 452 453 |
# File 'lib/ably/realtime/connection.rb', line 451 def resumed resume_callbacks.each(&:call) end |
#send_protocol_message(protocol_message) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent
361 362 363 364 365 366 367 368 369 |
# File 'lib/ably/realtime/connection.rb', line 361 def () () do Ably::Models::ProtocolMessage.new(, logger: logger).tap do || logger.debug("Connection: Prot msg queued =>: #{.action} #{}") end end end |
#set_connection_details(connection_details) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
445 446 447 |
# File 'lib/ably/realtime/connection.rb', line 445 def set_connection_details(connection_details) @details = connection_details end |
#set_failed_connection_error_reason(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
435 436 437 |
# File 'lib/ably/realtime/connection.rb', line 435 def set_failed_connection_error_reason(error) @error_reason = error end |
#update_connection_serial(connection_serial) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Store last received connection serial so that the connection can be resumed from the last known point-in-time
291 292 293 |
# File 'lib/ably/realtime/connection.rb', line 291 def update_connection_serial(connection_serial) @serial = connection_serial end |