Class: GraphQL::Subscriptions::ActionCableSubscriptions

Inherits:
GraphQL::Subscriptions show all
Defined in:
lib/graphql/subscriptions/action_cable_subscriptions.rb

Overview

A subscriptions implementation that sends data as ActionCable broadcastings.

Experimental, some things to keep in mind:

  • No queueing system; ActiveJob should be added
  • Take care to reload context when re-delivering the subscription. (see Query#subscription_update?)

Examples:

Adding ActionCableSubscriptions to your schema

MySchema = GraphQL::Schema.define do
  # ...
  use GraphQL::Subscriptions::ActionCableSubscriptions
end

Implementing a channel for GraphQL Subscriptions

class GraphqlChannel < ApplicationCable::Channel
  def subscribed
    @subscription_ids = []
  end

  def execute(data)
    query = data["query"]
    variables = ensure_hash(data["variables"])
    operation_name = data["operationName"]
    context = {
      current_user: current_user,
      # Make sure the channel is in the context
      channel: self,
    }

    result = MySchema.execute({
      query: query,
      context: context,
      variables: variables,
      operation_name: operation_name
    })

    payload = {
      result: result.subscription? ? { data: nil } : result.to_h,
      more: result.subscription?,
    }

    # Track the subscription here so we can remove it
    # on unsubscribe.
    if result.context[:subscription_id]
      @subscription_ids << context[:subscription_id]
    end

    transmit(payload)
  end

  def unsubscribed
    @subscription_ids.each { |sid|
      MySchema.subscriptions.delete_subscription(sid)
    }
  end

  private

    def ensure_hash(ambiguous_param)
      case ambiguous_param
      when String
        if ambiguous_param.present?
          ensure_hash(JSON.parse(ambiguous_param))
        else
          {}
        end
      when Hash, ActionController::Parameters
        ambiguous_param
      when nil
        {}
      else
        raise ArgumentError, "Unexpected parameter: #{ambiguous_param}"
      end
    end
end

Constant Summary collapse

SUBSCRIPTION_PREFIX =
"graphql-subscription:"
EVENT_PREFIX =
"graphql-event:"

Instance Method Summary collapse

Methods inherited from GraphQL::Subscriptions

#build_id, #each_subscription_id, #execute, #normalize_name, #trigger, use

Constructor Details

#initialize(serializer: Serialize, **rest) ⇒ ActionCableSubscriptions

Returns a new instance of ActionCableSubscriptions.

Parameters:

  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) (defaults to: Serialize)

    erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to .broadcast(msg)



86
87
88
89
90
91
92
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 86

def initialize(serializer: Serialize, **rest)
  # A per-process map of subscriptions to deliver.
  # This is provided by Rails, so let's use it
  @subscriptions = Concurrent::Map.new
  @serializer = serializer
  super
end

Instance Method Details

#delete_subscription(subscription_id) ⇒ Object

The channel was closed, forget about it.



139
140
141
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 139

def delete_subscription(subscription_id)
  @subscriptions.delete(subscription_id)
end

#deliver(subscription_id, result) ⇒ Object

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.



104
105
106
107
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 104

def deliver(subscription_id, result)
  payload = { result: result.to_h, more: true }
  ActionCable.server.broadcast(SUBSCRIPTION_PREFIX + subscription_id, payload)
end

#execute_all(event, object) ⇒ Object

An event was triggered; Push the data over ActionCable. Subscribers will re-evaluate locally.



96
97
98
99
100
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 96

def execute_all(event, object)
  stream = EVENT_PREFIX + event.topic
  message = @serializer.dump(object)
  ActionCable.server.broadcast(stream, message)
end

#read_subscription(subscription_id) ⇒ Object

Return the query from "storage" (in memory)



128
129
130
131
132
133
134
135
136
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 128

def read_subscription(subscription_id)
  query = @subscriptions[subscription_id]
  {
    query_string: query.query_string,
    variables: query.provided_variables,
    context: query.context.to_h,
    operation_name: query.operation_name,
  }
end

#write_subscription(query, events) ⇒ Object

A query was run where these events were subscribed to. Store them in memory in this ActionCable frontend. It will receive notifications when events come in and re-evaluate the query locally.



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/graphql/subscriptions/action_cable_subscriptions.rb', line 113

def write_subscription(query, events)
  channel = query.context.fetch(:channel)
  subscription_id = query.context[:subscription_id] ||= build_id
  stream = query.context[:action_cable_stream] ||= SUBSCRIPTION_PREFIX + subscription_id
  channel.stream_from(stream)
  @subscriptions[subscription_id] = query
  events.each do |event|
    channel.stream_from(EVENT_PREFIX + event.topic, coder: ActiveSupport::JSON) do |message|
      execute(subscription_id, event, @serializer.load(message))
      nil
    end
  end
end