Class: GraphQL::Subscriptions::AnyCableSubscriptions

Inherits:
GraphQL::Subscriptions show all
Extended by:
Forwardable
Defined in:
lib/graphql/subscriptions/anycable_subscriptions.rb

Constant Summary collapse

SUBSCRIPTION_PREFIX =

HASH: Stores subscription data: query, context, …

"subscription:"
FINGERPRINTS_PREFIX =

ZSET: To get fingerprints by topic

"fingerprints:"
SUBSCRIPTIONS_PREFIX =

SET: To get subscriptions by fingerprint

"subscriptions:"
CHANNEL_PREFIX =

SET: Auxiliary structure for whole channel’s subscriptions cleanup

"channel:"

Instance Method Summary collapse

Constructor Details

#initialize(serializer: Serialize, **rest) ⇒ AnyCableSubscriptions

Returns a new instance of AnyCableSubscriptions.

Parameters:

  • serializer (<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)`) (defaults to: Serialize)

    erializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to ‘.broadcast(msg)`



66
67
68
69
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 66

def initialize(serializer: Serialize, **rest)
  @serializer = serializer
  super
end

Instance Method Details

#delete_channel_subscriptions(channel) ⇒ Object

The channel was closed, forget about it and its subscriptions

Raises:

  • (ArgumentError)


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 179

def delete_channel_subscriptions(channel)
  raise(ArgumentError, "Please pass channel instance to #{__method__} in your #unsubscribed method") if channel.is_a?(String)

  channel_id = read_subscription_id(channel)

  # Missing in case disconnect happens before #execute
  return unless channel_id

  with_redis do |redis|
    redis.smembers(redis_key(CHANNEL_PREFIX) + channel_id).each do |subscription_id|
      delete_subscription(subscription_id, redis: redis)
    end
    redis.del(redis_key(CHANNEL_PREFIX) + channel_id)
  end
end

#delete_subscription(subscription_id, redis: AnyCable.redis) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 195

def delete_subscription(subscription_id, redis: AnyCable.redis)
  events = redis.hget(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, :events)
  events = events ? JSON.parse(events) : {}
  fingerprint_subscriptions = {}
  redis.pipelined do |pipeline|
    events.each do |topic, fingerprint|
      pipeline.srem(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint, subscription_id)
      score = pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + topic, -1, fingerprint)
      fingerprint_subscriptions[redis_key(FINGERPRINTS_PREFIX) + topic] = score
    end
    # Delete subscription itself
    pipeline.del(redis_key(SUBSCRIPTION_PREFIX) + subscription_id)
  end
  # Clean up fingerprints that doesn't have any subscriptions left
  redis.pipelined do |pipeline|
    fingerprint_subscriptions.each do |key, score|
      pipeline.zremrangebyscore(key, "-inf", "0") if score.value.zero?
    end
  end
end

#deliver(stream_key, result) ⇒ Object

This subscription was re-evaluated. Send it to the specific stream where this client was waiting.

Parameters:

  • strean_key (String)
  • result (#to_h)

    result to send to clients



119
120
121
122
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 119

def deliver(stream_key, result)
  payload = {result: result.to_h, more: true}.to_json
  broadcast(stream_key, payload)
end

#execute(subscription_id, event, object) ⇒ Object

Disable this method as there is no fingerprint (it can be retrieved from subscription though)

Raises:

  • (NotImplementedError)


111
112
113
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 111

def execute(subscription_id, event, object)
  raise NotImplementedError, "Use execute_all method instead of execute to get actual event fingerprint"
end

#execute_all(event, object) ⇒ Object

An event was triggered. Re-evaluate all subscribed queries and push the data over ActionCable.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 73

def execute_all(event, object)
  fingerprints = with_redis { |redis| redis.zrange(redis_key(FINGERPRINTS_PREFIX) + event.topic, 0, -1) }
  return if fingerprints.empty?

  fingerprint_subscription_ids = with_redis do |redis|
    fingerprints.zip(
      redis.pipelined do |pipeline|
        fingerprints.map do |fingerprint|
          pipeline.smembers(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint)
        end
      end
    ).to_h
  end

  fingerprint_subscription_ids.each do |fingerprint, subscription_ids|
    execute_grouped(fingerprint, subscription_ids, event, object)
  end

  # Call to +trigger+ returns this. Convenient for playing in console
  fingerprint_subscription_ids.map { |k, v| [k, v.size] }.to_h
end

#execute_grouped(fingerprint, subscription_ids, event, object) ⇒ Object

The fingerprint has told us that this response should be shared by all subscribers, so just run it once, then deliver the result to every subscriber



97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 97

def execute_grouped(fingerprint, subscription_ids, event, object)
  return if subscription_ids.empty?

  subscription_id = with_redis { |redis| subscription_ids.find { |sid| redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) } }
  return unless subscription_id # All subscriptions has expired but haven't cleaned up yet

  result = execute_update(subscription_id, event, object)
  return unless result

  # Having calculated the result _once_, send the same payload to all subscribers
  deliver(redis_key(SUBSCRIPTIONS_PREFIX) + fingerprint, result)
end

#read_subscription(subscription_id) ⇒ Object

Return the query from “storage” (in redis)



163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 163

def read_subscription(subscription_id)
  with_redis do |redis|
    redis.mapped_hmget(
      "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}",
      :query_string, :variables, :context, :operation_name
    ).tap do |subscription|
      next if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key

      subscription[:context] = @serializer.load(subscription[:context])
      subscription[:variables] = JSON.parse(subscription[:variables])
      subscription[:operation_name] = nil if subscription[:operation_name].strip == ""
    end
  end
end

#write_subscription(query, events) ⇒ Object

Save query to “storage” (in redis)



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/graphql/subscriptions/anycable_subscriptions.rb', line 125

def write_subscription(query, events)
  context = query.context.to_h
  subscription_id = context.delete(:subscription_id) || build_id
  channel = context.delete(:channel)

  raise GraphQL::AnyCable::ChannelConfigurationError unless channel

  # Store subscription_id in the channel state to cleanup on disconnect
  write_subscription_id(channel, subscription_id)

  events.each do |event|
    channel.stream_from(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint)
  end

  data = {
    query_string: query.query_string,
    variables: query.provided_variables.to_json,
    context: @serializer.dump(context.to_h),
    operation_name: query.operation_name.to_s,
    events: events.map { |e| [e.topic, e.fingerprint] }.to_h.to_json
  }

  with_redis do |redis|
    redis.multi do |pipeline|
      pipeline.sadd(redis_key(CHANNEL_PREFIX) + subscription_id, [subscription_id])
      pipeline.mapped_hmset(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, data)
      events.each do |event|
        pipeline.zincrby(redis_key(FINGERPRINTS_PREFIX) + event.topic, 1, event.fingerprint)
        pipeline.sadd(redis_key(SUBSCRIPTIONS_PREFIX) + event.fingerprint, [subscription_id])
      end
      next unless config.subscription_expiration_seconds
      pipeline.expire(redis_key(CHANNEL_PREFIX) + subscription_id, config.subscription_expiration_seconds)
      pipeline.expire(redis_key(SUBSCRIPTION_PREFIX) + subscription_id, config.subscription_expiration_seconds)
    end
  end
end