Module: ActionCable::Channel::Streams
- Extended by:
- ActiveSupport::Concern
- Included in:
- Base
- Defined in:
- lib/action_cable/channel/streams.rb
Overview
# Action Cable Channel Streams
Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data placed into it is automatically sent to the clients that are connected at that time. It’s purely an online queue, though. If you’re not streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between the two parties (the broadcaster and the channel subscriber). Here’s an example of a channel that allows subscribers to get all new comments on a given page:
class CommentsChannel < ApplicationCable::Channel
def follow(data)
stream_from "comments_for_#{data['recording_id']}"
end
def unfollow
stop_all_streams
end
end
Based on the above example, the subscribers of this channel will get whatever data is put into the, let’s say, ‘comments_for_45` broadcasting as soon as it’s put there.
An example broadcasting for this channel looks like so:
ActionCable.server.broadcast "comments_for_45", { author: 'DHH', content: 'Rails is just swell' }
If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. The following example would subscribe to a broadcasting like ‘comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`.
class CommentsChannel < ApplicationCable::Channel
def subscribed
post = Post.find(params[:id])
stream_for post
end
end
You can then broadcast to this channel using:
CommentsChannel.broadcast_to(@post, @comment)
If you don’t just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. The below example shows how you can use this to provide performance introspection in the process:
class ChatChannel < ApplicationCable::Channel
def subscribed
@room = Chat::Room[params[:room_number]]
stream_for @room, coder: ActiveSupport::JSON do ||
if ['originated_at'].present?
elapsed_time = (Time.now.to_f - ['originated_at']).round(2)
ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
logger.info "Message took #{elapsed_time}s to arrive"
end
transmit
end
end
end
You can stop streaming from all broadcasts by calling #stop_all_streams.
Instance Method Summary collapse
-
#stop_all_streams ⇒ Object
Unsubscribes all streams associated with this channel from the pubsub queue.
-
#stop_stream_for(model) ⇒ Object
Unsubscribes streams for the ‘model`.
-
#stop_stream_from(broadcasting) ⇒ Object
Unsubscribes streams from the named ‘broadcasting`.
-
#stream_for(model, callback = nil, coder: nil, &block) ⇒ Object
Start streaming the pubsub queue for the ‘model` in this channel.
-
#stream_from(broadcasting, callback = nil, coder: nil, &block) ⇒ Object
Start streaming from the named ‘broadcasting` pubsub queue.
-
#stream_or_reject_for(model) ⇒ Object
Calls stream_for with the given ‘model` if it’s present to start streaming, otherwise rejects the subscription.
Instance Method Details
#stop_all_streams ⇒ Object
Unsubscribes all streams associated with this channel from the pubsub queue.
135 136 137 138 139 140 |
# File 'lib/action_cable/channel/streams.rb', line 135 def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end |
#stop_stream_for(model) ⇒ Object
Unsubscribes streams for the ‘model`.
130 131 132 |
# File 'lib/action_cable/channel/streams.rb', line 130 def stop_stream_for(model) stop_stream_from(broadcasting_for(model)) end |
#stop_stream_from(broadcasting) ⇒ Object
Unsubscribes streams from the named ‘broadcasting`.
121 122 123 124 125 126 127 |
# File 'lib/action_cable/channel/streams.rb', line 121 def stop_stream_from(broadcasting) callback = streams.delete(broadcasting) if callback pubsub.unsubscribe(broadcasting, callback) logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end end |
#stream_for(model, callback = nil, coder: nil, &block) ⇒ Object
Start streaming the pubsub queue for the ‘model` in this channel. Optionally, you can pass a `callback` that’ll be used instead of the default of just transmitting the updates straight to the subscriber.
Pass ‘coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. Defaults to `coder: nil` which does no decoding, passes raw messages.
116 117 118 |
# File 'lib/action_cable/channel/streams.rb', line 116 def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for(model), callback || block, coder: coder) end |
#stream_from(broadcasting, callback = nil, coder: nil, &block) ⇒ Object
Start streaming from the named ‘broadcasting` pubsub queue. Optionally, you can pass a `callback` that’ll be used instead of the default of just transmitting the updates straight to the subscriber. Pass ‘coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. Defaults to `coder: nil` which does no decoding, passes raw messages.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/action_cable/channel/streams.rb', line 90 def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! # Build a stream handler by wrapping the user-provided callback with a decoder # or defaulting to a JSON-decoding retransmitter. handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams[broadcasting] = handler connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end end |
#stream_or_reject_for(model) ⇒ Object
Calls stream_for with the given ‘model` if it’s present to start streaming, otherwise rejects the subscription.
144 145 146 147 148 149 150 |
# File 'lib/action_cable/channel/streams.rb', line 144 def stream_or_reject_for(model) if model stream_for model else reject end end |