Class: GraphQL::Subscriptions::ActionCableSubscriptions
- Inherits:
-
GraphQL::Subscriptions
- Object
- GraphQL::Subscriptions
- GraphQL::Subscriptions::ActionCableSubscriptions
- Defined in:
- lib/graphql/subscriptions/action_cable_subscriptions.rb
Overview
A subscriptions implementation that sends data as ActionCable broadcastings.
Some things to keep in mind:
- No queueing system; ActiveJob should be added
- Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
- Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won't work from background jobs or the Rails console.
Constant Summary collapse
- SUBSCRIPTION_PREFIX =
"graphql-subscription:"
- EVENT_PREFIX =
"graphql-event:"
Instance Attribute Summary
Attributes inherited from GraphQL::Subscriptions
Instance Method Summary collapse
-
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
-
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated.
-
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable.
-
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
constructor
A new instance of ActionCableSubscriptions.
-
#load_action_cable_message(message, context) ⇒ Object
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.
-
#read_subscription(subscription_id) ⇒ Object
Return the query from "storage" (in memory).
-
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action.
-
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to.
Methods inherited from GraphQL::Subscriptions
#broadcastable?, #build_id, #execute, #execute_update, #normalize_name, #trigger, use, #validate_update?
Constructor Details
#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions
Returns a new instance of ActionCableSubscriptions.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 90 def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) # A per-process map of subscriptions to deliver. # This is provided by Rails, so let's use it @subscriptions = Concurrent::Map.new @events = Concurrent::Map.new do |h, k| h.compute_if_absent(k) do Concurrent::Map.new do |h2, k2| h2.compute_if_absent(k2) { Concurrent::Array.new } end end end @action_cable = action_cable @action_cable_coder = action_cable_coder @serializer = serializer @serialize_with_context = case @serializer.method(:load).arity when 1 false when 2 true else raise ArgumentError, "#{@serializer} must respond to `.load` accepting one or two arguments" end @transmit_ns = namespace super end |
Instance Method Details
#delete_subscription(subscription_id) ⇒ Object
The channel was closed, forget about it.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 225 def delete_subscription(subscription_id) query = @subscriptions.delete(subscription_id) # In case this came from the server, tell the client to unsubscribe: @action_cable.server.broadcast(stream_subscription_name(subscription_id), { more: false }) # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478 if query events = query.context.namespace(:subscriptions)[:events] events.each do |event| ev_by_fingerprint = @events[event.topic] ev_for_fingerprint = ev_by_fingerprint[event.fingerprint] ev_for_fingerprint.delete(event) if ev_for_fingerprint.empty? ev_by_fingerprint.delete(event.fingerprint) end end end end |
#deliver(subscription_id, result) ⇒ Object
This subscription was re-evaluated. Send it to the specific stream where this client was waiting.
126 127 128 129 130 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 126 def deliver(subscription_id, result) has_more = !result.context.namespace(:subscriptions)[:final_update] payload = { result: result.to_h, more: has_more } @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload) end |
#execute_all(event, object) ⇒ Object
An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.
118 119 120 121 122 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 118 def execute_all(event, object) stream = stream_event_name(event) = @serializer.dump(object) @action_cable.server.broadcast(stream, ) end |
#load_action_cable_message(message, context) ⇒ Object
This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.
198 199 200 201 202 203 204 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 198 def (, context) if @serialize_with_context @serializer.load(, context) else @serializer.load() end end |
#read_subscription(subscription_id) ⇒ Object
Return the query from "storage" (in memory)
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 207 def read_subscription(subscription_id) query = @subscriptions[subscription_id] if query.nil? # This can happen when a subscription is triggered from an unsubscribed channel, # see https://github.com/rmosolgo/graphql-ruby/issues/2478. # (This `nil` is handled by `#execute_update`) nil else { query_string: query.query_string, variables: query.provided_variables, context: query.context.to_h, operation_name: query.operation_name, } end end |
#setup_stream(channel, initial_event) ⇒ Object
Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.
But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.
To make sure there's always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 167 def setup_stream(channel, initial_event) topic = initial_event.topic event_stream = stream_event_name(initial_event) channel.stream_from(event_stream, coder: @action_cable_coder) do || events_by_fingerprint = @events[topic] object = nil events_by_fingerprint.each do |_fingerprint, events| if !events.empty? && events.first == initial_event # The fingerprint has told us that this response should be shared by all subscribers, # so just run it once, then deliver the result to every subscriber first_event = events.first first_subscription_id = first_event.context.fetch(:subscription_id) object ||= (, first_event.context) result = execute_update(first_subscription_id, first_event, object) if !result.nil? # Having calculated the result _once_, send the same payload to all subscribers events.each do |event| subscription_id = event.context.fetch(:subscription_id) deliver(subscription_id, result) end end end end nil end end |
#write_subscription(query, events) ⇒ Object
A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 136 def write_subscription(query, events) unless (channel = query.context[:channel]) raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\ "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\ "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\ "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\ "GraphiQL via `graphiql-rails` may not work out of box (#1051)." end subscription_id = query.context[:subscription_id] ||= build_id stream = stream_subscription_name(subscription_id) channel.stream_from(stream) @subscriptions[subscription_id] = query events.each do |event| # Setup a new listener to run all events with this topic in this process setup_stream(channel, event) # Add this event to the list of events to be updated @events[event.topic][event.fingerprint] << event end end |