Class: GraphQL::Subscriptions::ActionCableSubscriptions

Inherits:
GraphQL::Subscriptions show all
Defined in:
lib/graphql/subscriptions/action_cable_subscriptions.rb

Overview

A subscriptions implementation that sends data as ActionCable broadcastings.

Some things to keep in mind:

  • No queueing system; ActiveJob should be added
  • Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)
  • Avoid the async ActionCable adapter and use the redis or PostgreSQL adapters instead. Otherwise calling #trigger won't work from background jobs or the Rails console.

Examples:

Adding ActionCableSubscriptions to your schema

class MySchema < GraphQL::Schema
  # ...
  use GraphQL::Subscriptions::ActionCableSubscriptions
end

Implementing a channel for GraphQL Subscriptions

class GraphqlChannel < ApplicationCable::Channel
  def subscribed
    @subscription_ids = []
  end

  def execute(data)
    query = data["query"]
    variables = ensure_hash(data["variables"])
    operation_name = data["operationName"]
    context = {
      # Re-implement whatever context methods you need
      # in this channel or ApplicationCable::Channel
      # current_user: current_user,
      # Make sure the channel is in the context
      channel: self,
    }

    result = MySchema.execute(
      query: query,
      context: context,
      variables: variables,
      operation_name: operation_name
    )

    payload = {
      result: result.to_h,
      more: result.subscription?,
    }

    # Track the subscription here so we can remove it
    # on unsubscribe.
    if result.context[:subscription_id]
      @subscription_ids << result.context[:subscription_id]
    end

    transmit(payload)
  end

  def unsubscribed
    @subscription_ids.each { |sid|
      MySchema.subscriptions.delete_subscription(sid)
    }
  end

  private

    def ensure_hash(ambiguous_param)
      case ambiguous_param
      when String
        if ambiguous_param.present?
          ensure_hash(JSON.parse(ambiguous_param))
        else
          {}
        end
      when Hash, ActionController::Parameters
        ambiguous_param
      when nil
        {}
      else
        raise ArgumentError, "Unexpected parameter: #{ambiguous_param}"
      end
    end
end

Constant Summary collapse

SUBSCRIPTION_PREFIX =
"graphql-subscription:"
EVENT_PREFIX =
"graphql-event:"

Instance Attribute Summary

Attributes inherited from GraphQL::Subscriptions

#default_broadcastable

Instance Method Summary collapse

Methods inherited from GraphQL::Subscriptions

#broadcastable?, #build_id, #execute, #execute_update, #normalize_name, #trigger, use

Constructor Details

#initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest) ⇒ ActionCableSubscriptions

Returns a new instance of ActionCableSubscriptions.

Parameters:

  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) (defaults to: Serialize)

    erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to .broadcast(msg)

  • namespace (string) (defaults to: '')

    Used to namespace events and subscriptions (default: '')



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 90

def initialize(serializer: Serialize, namespace: '', action_cable: ActionCable, action_cable_coder: ActiveSupport::JSON, **rest)
  # A per-process map of subscriptions to deliver.
  # This is provided by Rails, so let's use it
  @subscriptions = Concurrent::Map.new
  @events = Concurrent::Map.new { |h, k| h[k] = Concurrent::Map.new { |h2, k2| h2[k2] = Concurrent::Array.new } }
  @action_cable = action_cable
  @action_cable_coder = action_cable_coder
  @serializer = serializer
  @serialize_with_context = case @serializer.method(:load).arity
  when 1
    false
  when 2
    true
  else
    raise ArgumentError, "#{@serializer} must repond to `.load` accepting one or two arguments"
  end
  @transmit_ns = namespace
  super
end

Instance Method Details

#delete_subscription(subscription_id) ⇒ Object

The channel was closed, forget about it.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 217

def delete_subscription(subscription_id)
  query = @subscriptions.delete(subscription_id)
  # This can be `nil` when `.trigger` happens inside an unsubscribed ActionCable channel,
  # see https://github.com/rmosolgo/graphql-ruby/issues/2478
  if query
    events = query.context.namespace(:subscriptions)[:events]
    events.each do |event|
      ev_by_fingerprint = @events[event.topic]
      ev_for_fingerprint = ev_by_fingerprint[event.fingerprint]
      ev_for_fingerprint.delete(event)
      if ev_for_fingerprint.empty?
        ev_by_fingerprint.delete(event.fingerprint)
      end
    end
  end
end

#deliver(subscription_id, result) ⇒ Object

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.



120
121
122
123
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 120

def deliver(subscription_id, result)
  payload = { result: result.to_h, more: true }
  @action_cable.server.broadcast(stream_subscription_name(subscription_id), payload)
end

#execute_all(event, object) ⇒ Object

An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.



112
113
114
115
116
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 112

def execute_all(event, object)
  stream = stream_event_name(event)
  message = @serializer.dump(object)
  @action_cable.server.broadcast(stream, message)
end

#load_action_cable_message(message, context) ⇒ Object

This is called to turn an ActionCable-broadcasted string (JSON) into a query-ready application object.

Parameters:

  • message (String)

    n ActionCable-broadcasted string (JSON)

  • context (GraphQL::Query::Context)

    the context of the first event for a given subscription fingerprint



190
191
192
193
194
195
196
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 190

def load_action_cable_message(message, context)
  if @serialize_with_context
    @serializer.load(message, context)
  else
    @serializer.load(message)
  end
end

#read_subscription(subscription_id) ⇒ Object

Return the query from "storage" (in memory)



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 199

def read_subscription(subscription_id)
  query = @subscriptions[subscription_id]
  if query.nil?
    # This can happen when a subscription is triggered from an unsubscribed channel,
    # see https://github.com/rmosolgo/graphql-ruby/issues/2478.
    # (This `nil` is handled by `#execute_update`)
    nil
  else
    {
      query_string: query.query_string,
      variables: query.provided_variables,
      context: query.context.to_h,
      operation_name: query.operation_name,
    }
  end
end

#setup_stream(channel, initial_event) ⇒ Object

Every subscribing channel is listening here, but only one of them takes any action. This is so we can reuse payloads when possible, and make one payload to send to all subscribers.

But the problem is, any channel could close at any time, so each channel has to be ready to take over the primary position.

To make sure there's always one-and-only-one channel building payloads, let the listener belonging to the first event on the list be the one to build and publish payloads.



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 160

def setup_stream(channel, initial_event)
  topic = initial_event.topic
  channel.stream_from(stream_event_name(initial_event), coder: @action_cable_coder) do |message|
    events_by_fingerprint = @events[topic]
    object = nil
    events_by_fingerprint.each do |_fingerprint, events|
      if events.any? && events.first == initial_event
        # The fingerprint has told us that this response should be shared by all subscribers,
        # so just run it once, then deliver the result to every subscriber
        first_event = events.first
        first_subscription_id = first_event.context.fetch(:subscription_id)
        object ||= load_action_cable_message(message, first_event.context)
        result = execute_update(first_subscription_id, first_event, object)
        if !result.nil?
          # Having calculated the result _once_, send the same payload to all subscribers
          events.each do |event|
            subscription_id = event.context.fetch(:subscription_id)
            deliver(subscription_id, result)
          end
        end
      end
    end
    nil
  end
end

#write_subscription(query, events) ⇒ Object

A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 129

def write_subscription(query, events)
  unless (channel = query.context[:channel])
    raise GraphQL::Error, "This GraphQL Subscription client does not support the transport protocol expected"\
      "by the backend Subscription Server implementation (graphql-ruby ActionCableSubscriptions in this case)."\
      "Some official client implementation including Apollo (https://graphql-ruby.org/javascript_client/apollo_subscriptions.html), "\
      "Relay Modern (https://graphql-ruby.org/javascript_client/relay_subscriptions.html#actioncable)."\
      "GraphiQL via `graphiql-rails` may not work out of box (#1051)."
  end
  subscription_id = query.context[:subscription_id] ||= build_id
  stream = stream_subscription_name(subscription_id)
  channel.stream_from(stream)
  @subscriptions[subscription_id] = query
  events.each do |event|
    # Setup a new listener to run all events with this topic in this process
    setup_stream(channel, event)
    # Add this event to the list of events to be updated
    @events[event.topic][event.fingerprint] << event
  end
end