Class: Ably::Realtime::Channel
- Inherits:
-
Object
- Object
- Ably::Realtime::Channel
- Extended by:
- Modules::Enum
- Includes:
- Modules::AsyncWrapper, Modules::Conversions, Modules::EventEmitter, Modules::EventMachineHelpers, Modules::MessageEmitter, Modules::StateEmitter, Modules::UsesStateMachine
- Defined in:
- lib/ably/realtime/channel.rb,
lib/ably/realtime/channel/channel_manager.rb,
lib/ably/realtime/channel/channel_state_machine.rb
Overview
The Channel class represents a Channel belonging to this application. The Channel instance allows messages to be published and received, and controls the lifecycle of this instance’s attachment to the channel.
Channels will always be in one of the following states:
initialized: 0
attaching: 1
attached: 2
detaching: 3
detached: 4
failed: 5
Note that the states are available as Enum-like constants:
Channel::STATE.Initialized
Channel::STATE.Attaching
Channel::STATE.Attached
Channel::STATE.Detaching
Channel::STATE.Detached
Channel::STATE.Failed
Channels emit errors - use on(:error) to subscribe to errors
Defined Under Namespace
Classes: ChannelManager, ChannelStateMachine
Constant Summary collapse
- STATE =
ruby_enum('STATE', :initialized, :attaching, :attached, :detaching, :detached, :failed )
- MAX_PROTOCOL_MESSAGE_BATCH_SIZE =
Max number of messages to bundle in a single ProtocolMessage
50
Instance Attribute Summary collapse
-
#__incoming_msgbus__ ⇒ Ably::Util::PubSub
readonly
private
Client library internal channel incoming message bus.
-
#attached_serial ⇒ Integer
readonly
private
Serial number assigned to this channel when it was attached.
-
#client ⇒ Ably::Realtime::Client
readonly
Client associated with this channel.
-
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException
readonly
When a channel failure occurs this attribute contains the Ably Exception.
-
#manager ⇒ Ably::Realtime::Channel::ChannelManager
readonly
private
The Channel manager responsible for attaching, detaching and handling failures for this channel.
-
#name ⇒ String
readonly
Channel name.
-
#options ⇒ Hash
readonly
Channel options configured for this channel, see #initialize for channel_options.
-
#state ⇒ Ably::Realtime::Connection::STATE
readonly
Channel state.
Attributes included from Modules::UsesStateMachine
#previous_state, #state_history
Instance Method Summary collapse
-
#attach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable
Attach to this channel, and call the block if provided when attached.
- #clear_error_reason ⇒ Object private
-
#detach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable
Detach this channel, and call the block if provided when in a Detached or Failed state.
-
#history(options = {}) {|Ably::Models::PaginatedResult<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable
Return the message history of the channel.
-
#initialize(client, name, channel_options = {}) ⇒ Channel
constructor
Initialize a new Channel object.
-
#logger ⇒ Object
private
Used by Modules::StateEmitter to debug state changes.
-
#presence ⇒ Ably::Realtime::Presence
Presence object for this Channel.
-
#publish(name, data = nil, attributes = {}) {|Ably::Models::Message, Array<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable
Publish one or more messages to the channel.
- #set_attached_serial(serial) ⇒ Object private
- #set_failed_channel_error_reason(error) ⇒ Object private
-
#subscribe(*names) {|Ably::Models::Message| ... } ⇒ void
Subscribe to messages matching providing event name, or all messages if event name not provided.
-
#unsubscribe(*names, &callback) ⇒ void
Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided.
- #update_options(channel_options) ⇒ Object private
Methods included from Modules::UsesStateMachine
#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!
Methods included from Modules::StateEmitter
#once_or_if, #once_state_changed, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed
Methods included from Modules::MessageEmitter
Methods included from Modules::EventEmitter
#emit, #off, #on, #once, #unsafe_on, #unsafe_once
Constructor Details
#initialize(client, name, channel_options = {}) ⇒ Channel
Initialize a new Channel object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/ably/realtime/channel.rb', line 87 def initialize(client, name, = {}) ensure_utf_8 :name, name @client = client @name = name @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) setup_event_handlers setup_presence end |
Instance Attribute Details
#__incoming_msgbus__ ⇒ Ably::Util::PubSub (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Client library internal channel incoming message bus.
259 260 261 262 263 |
# File 'lib/ably/realtime/channel.rb', line 259 def __incoming_msgbus__ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end |
#attached_serial ⇒ Integer (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Serial number assigned to this channel when it was attached
78 79 80 |
# File 'lib/ably/realtime/channel.rb', line 78 def attached_serial @attached_serial end |
#client ⇒ Ably::Realtime::Client (readonly)
Ably::Realtime::Client associated with this channel
56 57 58 |
# File 'lib/ably/realtime/channel.rb', line 56 def client @client end |
#error_reason ⇒ Ably::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)
When a channel failure occurs this attribute contains the Ably Exception
68 69 70 |
# File 'lib/ably/realtime/channel.rb', line 68 def error_reason @error_reason end |
#manager ⇒ Ably::Realtime::Channel::ChannelManager (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The Channel manager responsible for attaching, detaching and handling failures for this channel
73 74 75 |
# File 'lib/ably/realtime/channel.rb', line 73 def manager @manager end |
#name ⇒ String (readonly)
Channel name
60 61 62 |
# File 'lib/ably/realtime/channel.rb', line 60 def name @name end |
#options ⇒ Hash (readonly)
Channel options configured for this channel, see #initialize for channel_options
64 65 66 |
# File 'lib/ably/realtime/channel.rb', line 64 def @options end |
#state ⇒ Ably::Realtime::Connection::STATE (readonly)
Returns channel state.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/ably/realtime/channel.rb', line 31 class Channel include Ably::Modules::Conversions include Ably::Modules::EventEmitter include Ably::Modules::EventMachineHelpers include Ably::Modules::AsyncWrapper include Ably::Modules::MessageEmitter extend Ably::Modules::Enum STATE = ruby_enum('STATE', :initialized, :attaching, :attached, :detaching, :detached, :failed ) include Ably::Modules::StateEmitter include Ably::Modules::UsesStateMachine ensure_state_machine_emits 'Ably::Models::ChannelStateChange' # Max number of messages to bundle in a single ProtocolMessage MAX_PROTOCOL_MESSAGE_BATCH_SIZE = 50 # {Ably::Realtime::Client} associated with this channel # @return [Ably::Realtime::Client] attr_reader :client # Channel name # @return [String] attr_reader :name # Channel options configured for this channel, see {#initialize} for channel_options # @return [Hash] attr_reader :options # When a channel failure occurs this attribute contains the Ably Exception # @return [Ably::Models::ErrorInfo,Ably::Exceptions::BaseAblyException] attr_reader :error_reason # The Channel manager responsible for attaching, detaching and handling failures for this channel # @return [Ably::Realtime::Channel::ChannelManager] # @api private attr_reader :manager # Serial number assigned to this channel when it was attached # @return [Integer] # @api private attr_reader :attached_serial # Initialize a new Channel object # # @param client [Ably::Rest::Client] # @param name [String] The name of the channel # @param channel_options [Hash] Channel options, currently reserved for Encryption options # @option channel_options [Hash,Ably::Models::CipherParams] :cipher A hash of options or a {Ably::Models::CipherParams} to configure the encryption. *:key* is required, all other options are optional. See {Ably::Util::Crypto#initialize} for a list of +:cipher+ options # def initialize(client, name, = {}) ensure_utf_8 :name, name @client = client @name = name @queue = [] @state_machine = ChannelStateMachine.new(self) @state = STATE(state_machine.current_state) @manager = ChannelManager.new(self, client.connection) setup_event_handlers setup_presence end # Publish one or more messages to the channel. # # When publishing a message, if the channel is not attached, the channel is implicitly attached # # @param name [String, Array<Ably::Models::Message|Hash>, nil] The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with +:name+ and +:data+ pairs # @param data [String, ByteArray, nil] The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument # @param attributes [Hash, nil] Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string # # @yield [Ably::Models::Message,Array<Ably::Models::Message>] On success, will call the block with the {Ably::Models::Message} if a single message is publishde, or an Array of {Ably::Models::Message} when multiple messages are published # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callbacks # # @example # # Publish a single message # channel.publish 'click', { x: 1, y: 2 } # # # Publish an array of message Hashes # messages = [ # { name: 'click', { x: 1, y: 2 } }, # { name: 'click', { x: 2, y: 3 } } # ] # channel.publish messages # # # Publish an array of Ably::Models::Message objects # messages = [ # Ably::Models::Message(name: 'click', { x: 1, y: 2 }) # Ably::Models::Message(name: 'click', { x: 2, y: 3 }) # ] # channel.publish messages # # channel.publish('click', 'body') do |message| # puts "#{message.name} event received with #{message.data}" # end # # channel.publish('click', 'body').errback do |error, message| # puts "#{message.name} was not received, error #{error.message}" # end # def publish(name, data = nil, attributes = {}, &success_block) raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a detached channel') if detached? || detaching? raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a failed channel') if failed? if !connection. raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") end = if name.kind_of?(Enumerable) name else ensure_utf_8 :name, name, allow_nil: true ensure_supported_payload data [{ name: name, data: data }.merge(attributes)] end ().tap do |deferrable| deferrable.callback(&success_block) if block_given? end end # Subscribe to messages matching providing event name, or all messages if event name not provided. # # When subscribing to messages, if the channel is not attached, the channel is implicitly attached # # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events. # @yield [Ably::Models::Message] For each message received, the block is called # # @return [void] # def subscribe(*names, &callback) attach unless attached? || attaching? super end # Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. # If a block is not provided, all subscriptions will be unsubscribed # # @param names [String] The event name of the message to subscribe to if provided. Defaults to all events. # # @return [void] # def unsubscribe(*names, &callback) super end # Attach to this channel, and call the block if provided when attached. # Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon # to need to call attach explicitly. # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Attached state # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def attach(&success_block) if connection.closing? || connection.closed? || connection.suspended? || connection.failed? raise Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") end transition_state_machine :attaching if can_transition_to?(:attaching) deferrable_for_state_change_to(STATE.Attached, &success_block) end # Detach this channel, and call the block if provided when in a Detached or Failed state # # @yield [Ably::Realtime::Channel] Block is called as soon as this channel is in the Detached or Failed state # @return [Ably::Util::SafeDeferrable] Deferrable that supports both success (callback) and failure (errback) callback # def detach(&success_block) if initialized? success_block.call if block_given? return Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| EventMachine.next_tick { deferrable.succeed } end end raise exception_for_state_change_to(:detaching) if failed? transition_state_machine :detaching if can_transition_to?(:detaching) deferrable_for_state_change_to(STATE.Detached, &success_block) end # Presence object for this Channel. This controls this client's # presence on the channel and may also be used to obtain presence information # and change events for other members of the channel. # # @return {Ably::Realtime::Presence} # def presence @presence end # Return the message history of the channel # # If the channel is attached, you can retrieve messages published on the channel before the # channel was attached with the option <tt>until_attach: true</tt>. This is useful when a developer # wishes to display historical messages with the guarantee that no messages have been missed since attach. # # @param (see Ably::Rest::Channel#history) # @option options (see Ably::Rest::Channel#history) # @option options [Boolean] :until_attach When true, the history request will be limited only to messages published before this channel was attached. Channel must be attached # # @yield [Ably::Models::PaginatedResult<Ably::Models::Message>] First {Ably::Models::PaginatedResult page} of {Ably::Models::Message} objects accessible with {Ably::Models::PaginatedResult#items #items}. # # @return [Ably::Util::SafeDeferrable] # def history( = {}, &callback) if .delete(:until_attach) raise ArgumentError, 'option :until_attach is invalid as the channel is not attached' unless attached? [:from_serial] = attached_serial end async_wrap(callback) do rest_channel.history(.merge(async_blocking_operations: true)) end end # @!attribute [r] __incoming_msgbus__ # @return [Ably::Util::PubSub] Client library internal channel incoming message bus # @api private def __incoming_msgbus__ @__incoming_msgbus__ ||= Ably::Util::PubSub.new( coerce_into: Proc.new { |event| Ably::Models::ProtocolMessage::ACTION(event) } ) end # @api private def set_failed_channel_error_reason(error) @error_reason = error end # @api private def clear_error_reason @error_reason = nil end # @api private def set_attached_serial(serial) @attached_serial = serial end # @api private def () @options = .clone.freeze end # Used by {Ably::Modules::StateEmitter} to debug state changes # @api private def logger client.logger end # As we are using a state machine, do not allow change_state to be used # #transition_state_machine must be used instead private :change_state private def queue @queue end def setup_event_handlers __incoming_msgbus__.subscribe(:message) do || .decode self .name, end on(STATE.Attached) do process_queue end end # Queue messages and process queue if channel is attached. # If channel is not yet attached, attempt to attach it before the message queue is processed. # @return [Ably::Util::SafeDeferrable] def () = Array().map do |raw_msg| (raw_msg).tap do || next if .client_id.nil? if .client_id == '*' raise Ably::Exceptions::IncompatibleClientId.new('Wildcard client_id is reserved and cannot be used when publishing messages', 400, 40012) end unless client.auth.can_assume_client_id?(.client_id) raise Ably::Exceptions::IncompatibleClientId.new("Cannot publish with client_id '#{.client_id}' as it is incompatible with the current configured client_id '#{client.client_id}'", 400, 40012) end end end queue.push(*) if attached? process_queue else attach end if .count == 1 # A message is a Deferrable so, if publishing only one message, simply return that Deferrable .first else () end end # A deferrable object that calls the success callback once all messages are delivered # If any message fails, the errback is called immediately # Only one callback or errback is ever called i.e. if a group of messages all fail, only once # errback will be invoked def () expected_deliveries = .count actual_deliveries = 0 failed = false Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| .each do || .callback do next if failed actual_deliveries += 1 deferrable.succeed if actual_deliveries == expected_deliveries end .errback do |error| next if failed failed = true deferrable.fail error, end end end end def !queue.empty? end # Move messages from Channel Queue into Outgoing Connection Queue def process_queue condition = -> { attached? && } non_blocking_loop_while(condition) do queue.shift(MAX_PROTOCOL_MESSAGE_BATCH_SIZE) end end def () connection.( action: Ably::Models::ProtocolMessage::ACTION.Message.to_i, channel: name, messages: ) end def () Ably::Models::Message(.dup).tap do |msg| msg.encode self end end def rest_channel client.rest_client.channel(name) end def connection client.connection end def setup_presence @presence ||= Presence.new(self) end end |
Instance Method Details
#attach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable
Attach to this channel, and call the block if provided when attached. Attaching to a channel is implicit in when a message is published or #subscribe is called, so it is uncommon to need to call attach explicitly.
193 194 195 196 197 198 199 200 |
# File 'lib/ably/realtime/channel.rb', line 193 def attach(&success_block) if connection.closing? || connection.closed? || connection.suspended? || connection.failed? raise Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}") end transition_state_machine :attaching if can_transition_to?(:attaching) deferrable_for_state_change_to(STATE.Attached, &success_block) end |
#clear_error_reason ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
271 272 273 |
# File 'lib/ably/realtime/channel.rb', line 271 def clear_error_reason @error_reason = nil end |
#detach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable
Detach this channel, and call the block if provided when in a Detached or Failed state
207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/ably/realtime/channel.rb', line 207 def detach(&success_block) if initialized? success_block.call if block_given? return Ably::Util::SafeDeferrable.new(logger).tap do |deferrable| EventMachine.next_tick { deferrable.succeed } end end raise exception_for_state_change_to(:detaching) if failed? transition_state_machine :detaching if can_transition_to?(:detaching) deferrable_for_state_change_to(STATE.Detached, &success_block) end |
#history(options = {}) {|Ably::Models::PaginatedResult<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable
Return the message history of the channel
If the channel is attached, you can retrieve messages published on the channel before the channel was attached with the option until_attach: true
. This is useful when a developer wishes to display historical messages with the guarantee that no messages have been missed since attach.
245 246 247 248 249 250 251 252 253 254 |
# File 'lib/ably/realtime/channel.rb', line 245 def history( = {}, &callback) if .delete(:until_attach) raise ArgumentError, 'option :until_attach is invalid as the channel is not attached' unless attached? [:from_serial] = attached_serial end async_wrap(callback) do rest_channel.history(.merge(async_blocking_operations: true)) end end |
#logger ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Used by Modules::StateEmitter to debug state changes
287 288 289 |
# File 'lib/ably/realtime/channel.rb', line 287 def logger client.logger end |
#presence ⇒ Ably::Realtime::Presence
Presence object for this Channel. This controls this client’s presence on the channel and may also be used to obtain presence information and change events for other members of the channel.
227 228 229 |
# File 'lib/ably/realtime/channel.rb', line 227 def presence @presence end |
#publish(name, data = nil, attributes = {}) {|Ably::Models::Message, Array<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable
Publish one or more messages to the channel.
When publishing a message, if the channel is not attached, the channel is implicitly attached
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/ably/realtime/channel.rb', line 140 def publish(name, data = nil, attributes = {}, &success_block) raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a detached channel') if detached? || detaching? raise Ably::Exceptions::ChannelInactive.new('Cannot publish messages on a failed channel') if failed? if !connection. raise Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is configured to disallow queueing of messages and connection is currently #{connection.state}") end = if name.kind_of?(Enumerable) name else ensure_utf_8 :name, name, allow_nil: true ensure_supported_payload data [{ name: name, data: data }.merge(attributes)] end ().tap do |deferrable| deferrable.callback(&success_block) if block_given? end end |
#set_attached_serial(serial) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
276 277 278 |
# File 'lib/ably/realtime/channel.rb', line 276 def set_attached_serial(serial) @attached_serial = serial end |
#set_failed_channel_error_reason(error) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
266 267 268 |
# File 'lib/ably/realtime/channel.rb', line 266 def set_failed_channel_error_reason(error) @error_reason = error end |
#subscribe(*names) {|Ably::Models::Message| ... } ⇒ void
This method returns an undefined value.
Subscribe to messages matching providing event name, or all messages if event name not provided.
When subscribing to messages, if the channel is not attached, the channel is implicitly attached
170 171 172 173 |
# File 'lib/ably/realtime/channel.rb', line 170 def subscribe(*names, &callback) attach unless attached? || attaching? super end |
#unsubscribe(*names, &callback) ⇒ void
This method returns an undefined value.
Unsubscribe the matching block for messages matching providing event name, or all messages if event name not provided. If a block is not provided, all subscriptions will be unsubscribed
182 183 184 |
# File 'lib/ably/realtime/channel.rb', line 182 def unsubscribe(*names, &callback) super end |
#update_options(channel_options) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
281 282 283 |
# File 'lib/ably/realtime/channel.rb', line 281 def () @options = .clone.freeze end |