Class: Ably::Realtime::Connection

Inherits:
Object
  • Object
show all
Extended by:
Modules::Enum
Includes:
Modules::Conversions, Modules::EventEmitter, Modules::SafeYield, Modules::StateEmitter, Modules::UsesStateMachine
Defined in:
lib/ably/realtime/connection.rb,
lib/ably/realtime/connection/connection_manager.rb,
lib/ably/realtime/connection/websocket_transport.rb,
lib/ably/realtime/connection/connection_state_machine.rb

Overview

The Connection class represents the connection associated with an Ably Realtime instance. The Connection object exposes the lifecycle and parameters of the realtime connection.

Connections will always be in one of the following states:

initialized:  0
connecting:   1
connected:    2
disconnected: 3
suspended:    4
closing:      5
closed:       6
failed:       7

Note that the states are available as Enum-like constants:

Connection::STATE.Initialized
Connection::STATE.Connecting
Connection::STATE.Connected
Connection::STATE.Disconnected
Connection::STATE.Suspended
Connection::STATE.Closing
Connection::STATE.Closed
Connection::STATE.Failed

Connection emit errors - use β€˜on(:error)` to subscribe to errors

Examples:

client = Ably::Realtime::Client.new('key.id:secret')
client.connection.on(:connected) do
  puts "Connected with connection ID: #{client.connection.id}"
end

Defined Under Namespace

Classes: ConnectionManager, ConnectionStateMachine, WebsocketTransport

Constant Summary collapse

STATE =

Valid Connection states

ruby_enum('STATE',
  :initialized,
  :connecting,
  :connected,
  :disconnected,
  :suspended,
  :closing,
  :closed,
  :failed
)
RECOVER_REGEX =

Expected format for a connection recover key

/^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/
DEFAULTS =

Defaults for automatic connection recovery and timeouts

{
  disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
  suspended_retry_timeout:    30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
  connection_state_ttl:       60, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
  realtime_request_timeout:   10  # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
}.freeze

Instance Attribute Summary collapse

Attributes included from Modules::UsesStateMachine

#previous_state, #state_history

Instance Method Summary collapse

Methods included from Modules::UsesStateMachine

#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!

Methods included from Modules::StateEmitter

#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed

Methods included from Modules::EventEmitter

#emit, #off, #on, #once, #unsafe_on, #unsafe_once

Constructor Details

#initialize(client, options) ⇒ Connection

Returns a new instance of Connection.



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/ably/realtime/connection.rb', line 122

def initialize(client, options)
  @client                        = client
  @client_serial                 = -1
  @__outgoing_message_queue__    = []
  @__pending_message_ack_queue__ = []

  @defaults = DEFAULTS.dup
  options.each do |key, val|
    @defaults[key] = val if DEFAULTS.has_key?(key)
  end if options.kind_of?(Hash)
  @defaults.freeze

  Client::IncomingMessageDispatcher.new client, self
  Client::OutgoingMessageDispatcher.new client, self

  @state_machine = ConnectionStateMachine.new(self)
  @state         = STATE(state_machine.current_state)
  @manager       = ConnectionManager.new(self)
end

Instance Attribute Details

#__incoming_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal incoming protocol message bus.

Returns:



313
314
315
# File 'lib/ably/realtime/connection.rb', line 313

def __incoming_protocol_msgbus__
  @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__outgoing_message_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage unsent outgoing messages. You should never interface with this array directly

Returns:

  • (Array)


108
109
110
# File 'lib/ably/realtime/connection.rb', line 108

def __outgoing_message_queue__
  @__outgoing_message_queue__
end

#__outgoing_protocol_msgbus__Ably::Util::PubSub (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal outgoing protocol message bus.

Returns:



306
307
308
# File 'lib/ably/realtime/connection.rb', line 306

def __outgoing_protocol_msgbus__
  @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
end

#__pending_message_ack_queue__Array (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An internal queue used to manage sent messages. You should never interface with this array directly

Returns:

  • (Array)


113
114
115
# File 'lib/ably/realtime/connection.rb', line 113

def __pending_message_ack_queue__
  @__pending_message_ack_queue__
end

#clientAbly::Realtime::Client (readonly)

Ably::Realtime::Client associated with this connection



93
94
95
# File 'lib/ably/realtime/connection.rb', line 93

def client
  @client
end

#current_hostString (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns The current host that is configured following a call to method #determine_host.

Returns:

  • (String)

    The current host that is configured following a call to method #determine_host



340
341
342
# File 'lib/ably/realtime/connection.rb', line 340

def current_host
  @current_host
end

#defaultsHash (readonly)

Configured recovery and timeout defaults for this Ably::Realtime::Connection. See the configurable options in Ably::Realtime::Client#initialize. The defaults are immutable

Returns:

  • (Hash)


119
120
121
# File 'lib/ably/realtime/connection.rb', line 119

def defaults
  @defaults
end

#detailsAbly::Models::ConnectionDetails (readonly)

Connection details of the currently established connection



89
90
91
# File 'lib/ably/realtime/connection.rb', line 89

def details
  @details
end

#error_reasonAbly::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)

When a connection failure occurs this attribute contains the Ably Exception



85
86
87
# File 'lib/ably/realtime/connection.rb', line 85

def error_reason
  @error_reason
end

#idString (readonly)

A unique public identifier for this connection, used to identify this member in presence events and messages

Returns:

  • (String)


73
74
75
# File 'lib/ably/realtime/connection.rb', line 73

def id
  @id
end

#keyString (readonly)

A unique private connection key used to recover this connection, assigned by Ably

Returns:

  • (String)


77
78
79
# File 'lib/ably/realtime/connection.rb', line 77

def key
  @key
end

#loggerLogger (readonly)

Returns The Logger for this client. Configure the log_level with the β€˜:log_level` option, refer to Ably::Realtime::Client#initialize.

Returns:



351
352
353
# File 'lib/ably/realtime/connection.rb', line 351

def logger
  client.logger
end

#managerAbly::Realtime::Connection::ConnectionManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Connection manager responsible for creating, maintaining and closing the connection and underlying transport



103
104
105
# File 'lib/ably/realtime/connection.rb', line 103

def manager
  @manager
end

#portInteger (readonly)

Returns The default port used for this connection.

Returns:

  • (Integer)

    The default port used for this connection



344
345
346
# File 'lib/ably/realtime/connection.rb', line 344

def port
  client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80
end

#recovery_keyString (readonly)

Returns recovery key that can be used by another client to recover this connection with the :recover option.

Returns:

  • (String)

    recovery key that can be used by another client to recover this connection with the :recover option



271
272
273
# File 'lib/ably/realtime/connection.rb', line 271

def recovery_key
  "#{key}:#{serial}" if connection_resumable?
end

#serialInteger (readonly)

The serial number of the last message to be received on this connection, used to recover or resume a connection

Returns:

  • (Integer)


81
82
83
# File 'lib/ably/realtime/connection.rb', line 81

def serial
  @serial
end

#stateAbly::Realtime::Connection::STATE (readonly)

Returns connection state.

Returns:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
# File 'lib/ably/realtime/connection.rb', line 39

class Connection
  include Ably::Modules::EventEmitter
  include Ably::Modules::Conversions
  include Ably::Modules::SafeYield
  extend Ably::Modules::Enum

  # Valid Connection states
  STATE = ruby_enum('STATE',
    :initialized,
    :connecting,
    :connected,
    :disconnected,
    :suspended,
    :closing,
    :closed,
    :failed
  )
  include Ably::Modules::StateEmitter
  include Ably::Modules::UsesStateMachine
  ensure_state_machine_emits 'Ably::Models::ConnectionStateChange'

  # Expected format for a connection recover key
  RECOVER_REGEX = /^(?<recover>[\w!-]+):(?<connection_serial>\-?\w+)$/

  # Defaults for automatic connection recovery and timeouts
  DEFAULTS = {
    disconnected_retry_timeout: 15, # when the connection enters the DISCONNECTED state, after this delay in milliseconds, if the state is still DISCONNECTED, the client library will attempt to reconnect automatically
    suspended_retry_timeout:    30, # when the connection enters the SUSPENDED state, after this delay in milliseconds, if the state is still SUSPENDED, the client library will attempt to reconnect automatically
    connection_state_ttl:       60, # the duration that Ably will persist the connection state when a Realtime client is abruptly disconnected
    realtime_request_timeout:   10  # default timeout when establishing a connection, or sending a HEARTBEAT, CONNECT, ATTACH, DETACH or CLOSE ProtocolMessage
  }.freeze

  # A unique public identifier for this connection, used to identify this member in presence events and messages
  # @return [String]
  attr_reader :id

  # A unique private connection key used to recover this connection, assigned by Ably
  # @return [String]
  attr_reader :key

  # The serial number of the last message to be received on this connection, used to recover or resume a connection
  # @return [Integer]
  attr_reader :serial

  # When a connection failure occurs this attribute contains the Ably Exception
  # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException]
  attr_reader :error_reason

  # Connection details of the currently established connection
  # @return [Ably::Models::ConnectionDetails]
  attr_reader :details

  # {Ably::Realtime::Client} associated with this connection
  # @return [Ably::Realtime::Client]
  attr_reader :client

  # Underlying socket transport used for this connection, for internal use by the client library
  # @return [Ably::Realtime::Connection::WebsocketTransport]
  # @api private
  attr_reader :transport

  # The Connection manager responsible for creating, maintaining and closing the connection and underlying transport
  # @return [Ably::Realtime::Connection::ConnectionManager]
  # @api private
  attr_reader :manager

  # An internal queue used to manage unsent outgoing messages. You should never interface with this array directly
  # @return [Array]
  # @api private
  attr_reader :__outgoing_message_queue__

  # An internal queue used to manage sent messages. You should never interface with this array directly
  # @return [Array]
  # @api private
  attr_reader :__pending_message_ack_queue__

  # Configured recovery and timeout defaults for this {Connection}.
  # See the configurable options in {Ably::Realtime::Client#initialize}.
  # The defaults are immutable
  # @return [Hash]
  attr_reader :defaults

  # @api public
  def initialize(client, options)
    @client                        = client
    @client_serial                 = -1
    @__outgoing_message_queue__    = []
    @__pending_message_ack_queue__ = []

    @defaults = DEFAULTS.dup
    options.each do |key, val|
      @defaults[key] = val if DEFAULTS.has_key?(key)
    end if options.kind_of?(Hash)
    @defaults.freeze

    Client::IncomingMessageDispatcher.new client, self
    Client::OutgoingMessageDispatcher.new client, self

    @state_machine = ConnectionStateMachine.new(self)
    @state         = STATE(state_machine.current_state)
    @manager       = ConnectionManager.new(self)
  end

  # Causes the connection to close, entering the closed state, from any state except
  # the failed state. Once closed, the library will not attempt to re-establish the
  # connection without a call to {Connection#connect}.
  #
  # @yield block is called as soon as this connection is in the Closed state
  #
  # @return [EventMachine::Deferrable]
  #
  def close(&success_block)
    unless closing? || closed?
      raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing)
      transition_state_machine :closing
    end
    deferrable_for_state_change_to(STATE.Closed, &success_block)
  end

  # Causes the library to attempt connection.  If it was previously explicitly
  # closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened.
  # Succeeds when connection is established i.e. state is @Connected@
  # Fails when state becomes either @Closing@, @Closed@ or @Failed@
  #
  # Note that if the connection remains in the disconnected ans suspended states indefinitely,
  # the Deferrable or block provided may never be called
  #
  # @yield block is called as soon as this connection is in the Connected state
  #
  # @return [EventMachine::Deferrable]
  #
  def connect(&success_block)
    unless connecting? || connected?
      raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting)
      transition_state_machine :connecting
    end

    Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
      deferrable.callback do
        yield if block_given?
      end
      succeed_callback = deferrable.method(:succeed)
      fail_callback    = deferrable.method(:fail)

      once(:connected) do
        deferrable.succeed
        off(&fail_callback)
      end

      once(:failed, :closed, :closing) do
        deferrable.fail
        off(&succeed_callback)
      end
    end
  end

  # Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server.
  # This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently.
  # The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the `:connected` state.
  #
  # @yield [Integer] if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds.
  #                  If the ping is not received within an acceptable timeframe, the block will be called with +nil+ as he first argument
  #
  # @example
  #    client = Ably::Rest::Client.new(key: 'key.id:secret')
  #    client.connection.ping do |ms_elapsed|
  #      puts "Ping took #{ms_elapsed}ms"
  #    end
  #
  # @return [void]
  #
  def ping(&block)
    raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
    raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?

    started = nil
    finished = false

    wait_for_ping = Proc.new do |protocol_message|
      next if finished
      if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
        finished = true
        __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
        time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i
        safe_yield block, time_passed if block_given?
      end
    end

    once_or_if(STATE.Connected) do
      next if finished
      started = Time.now
      send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i
      __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
    end

    EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
      next if finished
      finished = true
      __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
      logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
      safe_yield block, nil if block_given?
    end
  end

  # @yield [Boolean] True if an internet connection check appears to be up following an HTTP request to a reliable CDN
  # @return [EventMachine::Deferrable]
  # @api private
  def internet_up?
    url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
    EventMachine::DefaultDeferrable.new.tap do |deferrable|
      EventMachine::HttpRequest.new(url).get.tap do |http|
        http.errback do
          yield false if block_given?
          deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, 80000)
        end
        http.callback do
          EventMachine.next_tick do
            result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
            yield result if block_given?
            if result
              deferrable.succeed
            else
              deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, 40000)
            end
          end
        end
      end
    end
  end

  # @!attribute [r] recovery_key
  # @return [String] recovery key that can be used by another client to recover this connection with the :recover option
  def recovery_key
    "#{key}:#{serial}" if connection_resumable?
  end

  # Following a new connection being made, the connection ID, connection key
  # and message serial need to match the details provided by the server.
  #
  # @return [void]
  # @api private
  def configure_new(connection_id, connection_key, connection_serial)
    @id            = connection_id
    @key           = connection_key
    @client_serial = connection_serial

    update_connection_serial connection_serial
  end

  # Store last received connection serial so that the connection can be resumed from the last known point-in-time
  # @return [void]
  # @api private
  def update_connection_serial(connection_serial)
    @serial = connection_serial
  end

  # Disable automatic resume of a connection
  # @return [void]
  # @api private
  def reset_resume_info
    @key    = nil
    @serial = nil
  end

  # @!attribute [r] __outgoing_protocol_msgbus__
  # @return [Ably::Util::PubSub] Client library internal outgoing protocol message bus
  # @api private
  def __outgoing_protocol_msgbus__
    @__outgoing_protocol_msgbus__ ||= create_pub_sub_message_bus
  end

  # @!attribute [r] __incoming_protocol_msgbus__
  # @return [Ably::Util::PubSub] Client library internal incoming protocol message bus
  # @api private
  def __incoming_protocol_msgbus__
    @__incoming_protocol_msgbus__ ||= create_pub_sub_message_bus
  end

  # Determines the correct host name to use for the next connection attempt and updates current_host
  # @yield [String] The host name used for this connection, for network connection failures a {Ably::FALLBACK_HOSTS fallback host} is used to route around networking or intermittent problems if an Internet connection is available
  # @api private
  def determine_host
    raise ArgumentError, 'Block required' unless block_given?

    if can_use_fallback_hosts?
      internet_up? do |internet_is_up_result|
        @current_host = if internet_is_up_result
          client.fallback_endpoint.host
        else
          client.endpoint.host
        end
        yield current_host
      end
    else
      @current_host = client.endpoint.host
      yield current_host
    end
  end

  # @return [String] The current host that is configured following a call to method {#determine_host}
  # @api private
  attr_reader :current_host

  # @!attribute [r] port
  # @return [Integer] The default port used for this connection
  def port
    client.use_tls? ? client.custom_tls_port || 443 : client.custom_port || 80
  end

  # @!attribute [r] logger
  # @return [Logger] The {Ably::Logger} for this client.
  #                  Configure the log_level with the `:log_level` option, refer to {Ably::Realtime::Client#initialize}
  def logger
    client.logger
  end

  # Add protocol message to the outgoing message queue and notify the dispatcher that a message is
  # ready to be sent
  #
  # @param [Ably::Models::ProtocolMessage] protocol_message
  # @return [void]
  # @api private
  def send_protocol_message(protocol_message)
    add_message_serial_if_ack_required_to(protocol_message) do
      Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
        add_message_to_outgoing_queue message
        notify_message_dispatcher_of_new_message message
        logger.debug("Connection: Prot msg queued =>: #{message.action} #{message}")
      end
    end
  end

  # @api private
  def add_message_to_outgoing_queue(protocol_message)
    __outgoing_message_queue__ << protocol_message
  end

  # @api private
  def notify_message_dispatcher_of_new_message(protocol_message)
    __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
  end

  # @return [EventMachine::Deferrable]
  # @api private
  def create_websocket_transport
    EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
      # Getting auth params can be blocking so uses a Deferrable
      client.auth.auth_params.tap do |auth_deferrable|
        auth_deferrable.callback do |auth_params|
          url_params = auth_params.merge(
            format:    client.protocol,
            echo:      client.echo_messages
          )

          url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?

          if connection_resumable?
            url_params.merge! resume: key, connection_serial: serial
            logger.debug "Resuming connection key #{key} with serial #{serial}"
          elsif connection_recoverable?
            url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
            logger.debug "Recovering connection with key #{client.recover}"
            once(:connected, :closed, :failed) do
              client.disable_automatic_connection_recovery
            end
          end

          url = URI(client.endpoint).tap do |endpoint|
            endpoint.query = URI.encode_www_form(url_params)
          end.to_s

          determine_host do |host|
            begin
              logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'"
              @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport|
                websocket_deferrable.succeed websocket_transport
              end
            rescue EventMachine::ConnectionError => error
              websocket_deferrable.fail error
            end
          end
        end

        auth_deferrable.errback do |error|
          websocket_deferrable.fail error
        end
      end
    end
  end

  # @api private
  def release_websocket_transport
    @transport = nil
  end

  # @api private
  def set_failed_connection_error_reason(error)
    @error_reason = error
  end

  # @api private
  def clear_error_reason
    @error_reason = nil
  end

  # @api private
  def set_connection_details(connection_details)
    @details = connection_details
  end

  # Executes registered callbacks for a successful connection resume event
  # @api private
  def resumed
    resume_callbacks.each(&:call)
  end

  # Provides a simple hook to inject a callback when a connection is successfully resumed
  # @api private
  def on_resume(&callback)
    resume_callbacks << callback
  end

  # Remove a registered connection resume callback
  # @api private
  def off_resume(&callback)
    resume_callbacks.delete(callback)
  end

  # Returns false if messages cannot be published as a result of message queueing being disabled
  # @api private
  def can_publish_messages?
    connected? ||
      ( (initialized? || connecting? || disconnected?) && client.queue_messages )
  end

  # As we are using a state machine, do not allow change_state to be used
  # #transition_state_machine must be used instead
  private :change_state

  private

  # The client serial is incremented for every message that is published that requires an ACK.
  # Note that this is different to the connection serial that contains the last known serial number
  # received from the server.
  #
  # A client serial number therefore does not guarantee a message has been received, only sent.
  # A connection serial guarantees the server has received the message and is thus used for connection
  # recovery and resumes.
  # @return [Integer] starting at -1 indicating no messages sent, 0 when the first message is sent
  def client_serial
    @client_serial
  end

  def resume_callbacks
    @resume_callbacks ||= []
  end

  def create_pub_sub_message_bus
    Ably::Util::PubSub.new(
      coerce_into: Proc.new do |event|
        raise KeyError, "Expected :protocol_message, :#{event} is disallowed" unless event == :protocol_message
        :protocol_message
      end
    )
  end

  def add_message_serial_if_ack_required_to(protocol_message)
    if Ably::Models::ProtocolMessage.ack_required?(protocol_message[:action])
      add_message_serial_to(protocol_message) { yield }
    else
      yield
    end
  end

  def add_message_serial_to(protocol_message)
    @client_serial += 1
    protocol_message[:msgSerial] = client_serial
    yield
  rescue StandardError => e
    @client_serial -= 1
    raise e
  end

  # Simply wait until the next EventMachine tick to ensure Connection initialization is complete
  def when_initialized
    EventMachine.next_tick { yield }
  end

  def connection_resumable?
    !key.nil? && !serial.nil?
  end

  def connection_recoverable?
    connection_recover_parts
  end

  def connection_recover_parts
    client.recover.to_s.match(RECOVER_REGEX)
  end

  def production?
    client.environment.nil? || client.environment == :production
  end

  def custom_port?
    if client.use_tls?
      !!client.custom_tls_port
    else
      !!client.custom_port
    end
  end

  def custom_host?
    !!client.custom_realtime_host
  end

  def can_use_fallback_hosts?
    if production? && !custom_port? && !custom_host?
      if connecting? && previous_state
        use_fallback_if_disconnected? || use_fallback_if_suspended?
      end
    end
  end

  def use_fallback_if_disconnected?
    second_reconnect_attempt_for(:disconnected, 1)
  end

  def use_fallback_if_suspended?
    second_reconnect_attempt_for(:suspended, 2) # on first suspended state use default Ably host again
  end

  def second_reconnect_attempt_for(state, first_attempt_count)
    previous_state == state && manager.retry_count_for_state(state) >= first_attempt_count
  end
end

#transportAbly::Realtime::Connection::WebsocketTransport (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Underlying socket transport used for this connection, for internal use by the client library



98
99
100
# File 'lib/ably/realtime/connection.rb', line 98

def transport
  @transport
end

Instance Method Details

#add_message_to_outgoing_queue(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



372
373
374
# File 'lib/ably/realtime/connection.rb', line 372

def add_message_to_outgoing_queue(protocol_message)
  __outgoing_message_queue__ << protocol_message
end

#can_publish_messages?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns false if messages cannot be published as a result of message queueing being disabled

Returns:

  • (Boolean)


469
470
471
472
# File 'lib/ably/realtime/connection.rb', line 469

def can_publish_messages?
  connected? ||
    ( (initialized? || connecting? || disconnected?) && client.queue_messages )
end

#clear_error_reasonObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



440
441
442
# File 'lib/ably/realtime/connection.rb', line 440

def clear_error_reason
  @error_reason = nil
end

#close { ... } ⇒ EventMachine::Deferrable

Causes the connection to close, entering the closed state, from any state except the failed state. Once closed, the library will not attempt to re-establish the connection without a call to #connect.

Yields:

  • block is called as soon as this connection is in the Closed state

Returns:

  • (EventMachine::Deferrable)


150
151
152
153
154
155
156
# File 'lib/ably/realtime/connection.rb', line 150

def close(&success_block)
  unless closing? || closed?
    raise exception_for_state_change_to(:closing) unless can_transition_to?(:closing)
    transition_state_machine :closing
  end
  deferrable_for_state_change_to(STATE.Closed, &success_block)
end

#configure_new(connection_id, connection_key, connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Following a new connection being made, the connection ID, connection key and message serial need to match the details provided by the server.



280
281
282
283
284
285
286
# File 'lib/ably/realtime/connection.rb', line 280

def configure_new(connection_id, connection_key, connection_serial)
  @id            = connection_id
  @key           = connection_key
  @client_serial = connection_serial

  update_connection_serial connection_serial
end

#connect { ... } ⇒ EventMachine::Deferrable

Causes the library to attempt connection. If it was previously explicitly closed by the user, or was closed as a result of an unrecoverable error, a new connection will be opened. Succeeds when connection is established i.e. state is @Connected@ Fails when state becomes either @Closing@, @Closed@ or @Failed@

Note that if the connection remains in the disconnected ans suspended states indefinitely, the Deferrable or block provided may never be called

Yields:

  • block is called as soon as this connection is in the Connected state

Returns:

  • (EventMachine::Deferrable)


170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/ably/realtime/connection.rb', line 170

def connect(&success_block)
  unless connecting? || connected?
    raise exception_for_state_change_to(:connecting) unless can_transition_to?(:connecting)
    transition_state_machine :connecting
  end

  Ably::Util::SafeDeferrable.new(logger).tap do |deferrable|
    deferrable.callback do
      yield if block_given?
    end
    succeed_callback = deferrable.method(:succeed)
    fail_callback    = deferrable.method(:fail)

    once(:connected) do
      deferrable.succeed
      off(&fail_callback)
    end

    once(:failed, :closed, :closing) do
      deferrable.fail
      off(&succeed_callback)
    end
  end
end

#create_websocket_transportEventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (EventMachine::Deferrable)


383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/ably/realtime/connection.rb', line 383

def create_websocket_transport
  EventMachine::DefaultDeferrable.new.tap do |websocket_deferrable|
    # Getting auth params can be blocking so uses a Deferrable
    client.auth.auth_params.tap do |auth_deferrable|
      auth_deferrable.callback do |auth_params|
        url_params = auth_params.merge(
          format:    client.protocol,
          echo:      client.echo_messages
        )

        url_params['clientId'] = client.auth.client_id if client.auth.has_client_id?

        if connection_resumable?
          url_params.merge! resume: key, connection_serial: serial
          logger.debug "Resuming connection key #{key} with serial #{serial}"
        elsif connection_recoverable?
          url_params.merge! recover: connection_recover_parts[:recover], connection_serial: connection_recover_parts[:connection_serial]
          logger.debug "Recovering connection with key #{client.recover}"
          once(:connected, :closed, :failed) do
            client.disable_automatic_connection_recovery
          end
        end

        url = URI(client.endpoint).tap do |endpoint|
          endpoint.query = URI.encode_www_form(url_params)
        end.to_s

        determine_host do |host|
          begin
            logger.debug "Connection: Opening socket connection to #{host}:#{port} and URL '#{url}'"
            @transport = EventMachine.connect(host, port, WebsocketTransport, self, url) do |websocket_transport|
              websocket_deferrable.succeed websocket_transport
            end
          rescue EventMachine::ConnectionError => error
            websocket_deferrable.fail error
          end
        end
      end

      auth_deferrable.errback do |error|
        websocket_deferrable.fail error
      end
    end
  end
end

#determine_host {|String| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Determines the correct host name to use for the next connection attempt and updates current_host

Yields:

  • (String)

    The host name used for this connection, for network connection failures a fallback host is used to route around networking or intermittent problems if an Internet connection is available

Raises:

  • (ArgumentError)


320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/ably/realtime/connection.rb', line 320

def determine_host
  raise ArgumentError, 'Block required' unless block_given?

  if can_use_fallback_hosts?
    internet_up? do |internet_is_up_result|
      @current_host = if internet_is_up_result
        client.fallback_endpoint.host
      else
        client.endpoint.host
      end
      yield current_host
    end
  else
    @current_host = client.endpoint.host
    yield current_host
  end
end

#internet_up? {|Boolean| ... } ⇒ EventMachine::Deferrable

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Yields:

  • (Boolean)

    True if an internet connection check appears to be up following an HTTP request to a reliable CDN

Returns:

  • (EventMachine::Deferrable)


246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/ably/realtime/connection.rb', line 246

def internet_up?
  url = "http#{'s' if client.use_tls?}:#{Ably::INTERNET_CHECK.fetch(:url)}"
  EventMachine::DefaultDeferrable.new.tap do |deferrable|
    EventMachine::HttpRequest.new(url).get.tap do |http|
      http.errback do
        yield false if block_given?
        deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unable to connect to #{url}", nil, 80000)
      end
      http.callback do
        EventMachine.next_tick do
          result = http.response_header.status == 200 && http.response.strip == Ably::INTERNET_CHECK.fetch(:ok_text)
          yield result if block_given?
          if result
            deferrable.succeed
          else
            deferrable.fail Ably::Exceptions::ConnectionFailed.new("Unexpected response from #{url} (#{http.response_header.status})", 400, 40000)
          end
        end
      end
    end
  end
end

#notify_message_dispatcher_of_new_message(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



377
378
379
# File 'lib/ably/realtime/connection.rb', line 377

def notify_message_dispatcher_of_new_message(protocol_message)
  __outgoing_protocol_msgbus__.publish :protocol_message, protocol_message
end

#off_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Remove a registered connection resume callback



463
464
465
# File 'lib/ably/realtime/connection.rb', line 463

def off_resume(&callback)
  resume_callbacks.delete(callback)
end

#on_resume(&callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Provides a simple hook to inject a callback when a connection is successfully resumed



457
458
459
# File 'lib/ably/realtime/connection.rb', line 457

def on_resume(&callback)
  resume_callbacks << callback
end

#ping {|Integer| ... } ⇒ void

This method returns an undefined value.

Sends a ping to Ably and yields the provided block when a heartbeat ping request is echoed from the server. This can be useful for measuring true roundtrip client to Ably server latency for a simple message, or checking that an underlying transport is responding currently. The elapsed milliseconds is passed as an argument to the block and represents the time taken to echo a ping heartbeat once the connection is in the β€˜:connected` state.

Examples:

client = Ably::Rest::Client.new(key: 'key.id:secret')
client.connection.ping do |ms_elapsed|
  puts "Ping took #{ms_elapsed}ms"
end

Yields:

  • (Integer)

    if a block is passed to this method, then this block will be called once the ping heartbeat is received with the time elapsed in milliseconds. If the ping is not received within an acceptable timeframe, the block will be called with nil as he first argument

Raises:

  • (RuntimeError)


210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/ably/realtime/connection.rb', line 210

def ping(&block)
  raise RuntimeError, 'Cannot send a ping when connection is not open' if initialized?
  raise RuntimeError, 'Cannot send a ping when connection is in a closed or failed state' if closed? || failed?

  started = nil
  finished = false

  wait_for_ping = Proc.new do |protocol_message|
    next if finished
    if protocol_message.action == Ably::Models::ProtocolMessage::ACTION.Heartbeat
      finished = true
      __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
      time_passed = (Time.now.to_f * 1000 - started.to_f * 1000).to_i
      safe_yield block, time_passed if block_given?
    end
  end

  once_or_if(STATE.Connected) do
    next if finished
    started = Time.now
    send_protocol_message action: Ably::Models::ProtocolMessage::ACTION.Heartbeat.to_i
    __incoming_protocol_msgbus__.subscribe :protocol_message, &wait_for_ping
  end

  EventMachine.add_timer(defaults.fetch(:realtime_request_timeout)) do
    next if finished
    finished = true
    __incoming_protocol_msgbus__.unsubscribe(:protocol_message, &wait_for_ping)
    logger.warn "Ping timed out after #{defaults.fetch(:realtime_request_timeout)}s"
    safe_yield block, nil if block_given?
  end
end

#release_websocket_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



430
431
432
# File 'lib/ably/realtime/connection.rb', line 430

def release_websocket_transport
  @transport = nil
end

#reset_resume_infovoid

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Disable automatic resume of a connection



298
299
300
301
# File 'lib/ably/realtime/connection.rb', line 298

def reset_resume_info
  @key    = nil
  @serial = nil
end

#resumedObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes registered callbacks for a successful connection resume event



451
452
453
# File 'lib/ably/realtime/connection.rb', line 451

def resumed
  resume_callbacks.each(&:call)
end

#send_protocol_message(protocol_message) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Add protocol message to the outgoing message queue and notify the dispatcher that a message is ready to be sent

Parameters:



361
362
363
364
365
366
367
368
369
# File 'lib/ably/realtime/connection.rb', line 361

def send_protocol_message(protocol_message)
  add_message_serial_if_ack_required_to(protocol_message) do
    Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
      add_message_to_outgoing_queue message
      notify_message_dispatcher_of_new_message message
      logger.debug("Connection: Prot msg queued =>: #{message.action} #{message}")
    end
  end
end

#set_connection_details(connection_details) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



445
446
447
# File 'lib/ably/realtime/connection.rb', line 445

def set_connection_details(connection_details)
  @details = connection_details
end

#set_failed_connection_error_reason(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



435
436
437
# File 'lib/ably/realtime/connection.rb', line 435

def set_failed_connection_error_reason(error)
  @error_reason = error
end

#update_connection_serial(connection_serial) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Store last received connection serial so that the connection can be resumed from the last known point-in-time



291
292
293
# File 'lib/ably/realtime/connection.rb', line 291

def update_connection_serial(connection_serial)
  @serial = connection_serial
end