Class: Ably::Realtime::Channel::ChannelManager Private
- Inherits:
-
Object
- Object
- Ably::Realtime::Channel::ChannelManager
- Extended by:
- Forwardable
- Defined in:
- lib/ably/realtime/channel/channel_manager.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
ChannelManager is responsible for all actions relating to channel state: attaching, detaching or failure Channel state changes are performed by this class and executed from ChannelStateMachine
This is a private class and should never be used directly by developers as the API is likely to change in future.
Instance Method Summary collapse
-
#attach ⇒ Object
private
Commence attachment.
-
#attached(attached_protocol_message) ⇒ Object
private
Channel is attached, notify presence if sync is expected.
-
#detach(error, previous_state) ⇒ Object
private
Commence attachment.
-
#detached_received(reason) ⇒ Object
private
Handle DETACED messages, see #RTL13 for server-initated detaches.
- #drop_pending_queue_from_ack(ack_protocol_message) ⇒ Object private
- #duplicate_attached_received(protocol_message) ⇒ Object private
-
#fail_messages_awaiting_ack(error, options = {}) ⇒ Object
private
When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably should be failed with a NACK.
- #fail_messages_in_queue(queue, error) ⇒ Object private
-
#fail_queued_messages(error) ⇒ Object
private
When a channel becomes suspended or failed, all queued messages should be failed immediately as we don’t queue in any of those states.
-
#initialize(channel, connection) ⇒ ChannelManager
constructor
private
A new instance of ChannelManager.
-
#log_channel_error(error) ⇒ Object
private
An error has occurred on the channel.
- #nack_message(message, error, protocol_message = nil) ⇒ Object private
- #nack_messages(protocol_message, error) ⇒ Object private
-
#notify_state_change ⇒ Object
private
RTL13c.
-
#request_reattach(reason = nil) ⇒ Object
private
Request channel to be reattached by sending an attach protocol message.
-
#start_attach_from_suspended_timer ⇒ Object
private
If the connection is still connected and the channel still suspended after channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b.
Constructor Details
#initialize(channel, connection) ⇒ ChannelManager
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ChannelManager.
12 13 14 15 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 12 def initialize(channel, connection) @channel = channel @connection = connection end |
Instance Method Details
#attach ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Commence attachment
18 19 20 21 22 23 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 18 def attach if can_transition_to?(:attached) connect_if_connection_initialized if connection.state?(:connected) # RTL4i end end |
#attached(attached_protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Channel is attached, notify presence if sync is expected
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 35 def attached() # If no attached ProtocolMessage then this attached request was triggered by the client # library, such as returning to attached when detach has failed if channel.presence.manager.on_attach .has_presence_flag? channel.properties.set_attach_serial(.channel_serial) channel..set_modes_from_flags(.flags) channel..set_params(.params) end end |
#detach(error, previous_state) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Commence attachment
26 27 28 29 30 31 32 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 26 def detach(error, previous_state) if connection.closed? || connection.connecting? || connection.suspended? channel.transition_state_machine :detached, reason: error elsif can_transition_to?(:detached) previous_state end end |
#detached_received(reason) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handle DETACED messages, see #RTL13 for server-initated detaches
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 83 def detached_received(reason) case channel.state.to_sym when :detaching channel.transition_state_machine :detached, reason: reason when :attached, :suspended channel.transition_state_machine :attaching, reason: reason else logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" } end end |
#drop_pending_queue_from_ack(ack_protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
146 147 148 149 150 151 152 153 154 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 146 def drop_pending_queue_from_ack() = . + .count - 1 connection..drop_while do || if . <= yield true end end end |
#duplicate_attached_received(protocol_message) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 60 def duplicate_attached_received() logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" } if .error channel.set_channel_error_reason .error log_channel_error .error end channel.properties.set_attach_serial(.channel_serial) channel..set_modes_from_flags(.flags) unless .has_channel_resumed_flag? channel.emit :update, Ably::Models::ChannelStateChange.new( current: channel.state, previous: channel.state, event: Ably::Realtime::Channel::EVENT(:update), reason: .error, resumed: false, ) channel.presence.manager.on_attach .has_presence_flag? end end |
#fail_messages_awaiting_ack(error, options = {}) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably should be failed with a NACK
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 98 def (error, = {}) immediately = [:immediately] || false fail_proc = lambda do error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error connection., error end # Allow a short time for other queued operations to complete before failing all messages if immediately fail_proc.call else EventMachine.add_timer(0.1) { fail_proc.call } end end |
#fail_messages_in_queue(queue, error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
122 123 124 125 126 127 128 129 130 131 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 122 def (queue, error) queue.delete_if do || if .action.match_any?(:presence, :message) if .channel == channel.name , error true end end end end |
#fail_queued_messages(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
When a channel becomes suspended or failed, all queued messages should be failed immediately as we don’t queue in any of those states
117 118 119 120 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 117 def (error) error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error connection., error end |
#log_channel_error(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
An error has occurred on the channel
47 48 49 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 47 def log_channel_error(error) logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" } end |
#nack_message(message, error, protocol_message = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
141 142 143 144 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 141 def (, error, = nil) logger.debug { "Calling NACK failure callbacks for #{.class.name} - #{.to_json} #{"protocol message: #{}" if }" } .fail error end |
#nack_messages(protocol_message, error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
133 134 135 136 137 138 139 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 133 def (, error) (. + .presence).each do || , error, end logger.debug { "Calling NACK failure callbacks for #{.class.name} - #{.to_json}" } .fail error end |
#notify_state_change ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
RTL13c
171 172 173 174 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 171 def notify_state_change @pending_state_change_timer.cancel if @pending_state_change_timer @pending_state_change_timer = nil end |
#request_reattach(reason = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Request channel to be reattached by sending an attach protocol message
53 54 55 56 57 58 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 53 def request_reattach(reason = nil) channel.set_channel_error_reason(reason) if reason channel.transition_state_machine! :attaching, reason: reason unless channel.attaching? logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" } end |
#start_attach_from_suspended_timer ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
If the connection is still connected and the channel still suspended after channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b
158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/ably/realtime/channel/channel_manager.rb', line 158 def start_attach_from_suspended_timer cancel_attach_from_suspended_timer if connection.connected? channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update } @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do channel.transition_state_machine! :attaching end end end |