Class: Pakyow::Realtime::Server::Adapters::Redis Private

Inherits:
Object
  • Object
show all
Defined in:
lib/pakyow/realtime/server/adapters/redis.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Manages websocket channels in redis.

Use this in production.

Defined Under Namespace

Classes: Buffer, Subscriber

Constant Summary collapse

KEY_PART_SEPARATOR =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"/"
KEY_PREFIX =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"realtime"
INFINITY =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"+inf"
PUBSUB_PREFIX =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"pubsub"

Instance Method Summary collapse

Constructor Details

#initialize(server, config) ⇒ Redis

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Redis.



24
25
26
27
28
29
30
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 24

def initialize(server, config)
  @server, @config = server, config
  @prefix = [@config[:key_prefix], KEY_PREFIX].join(KEY_PART_SEPARATOR)

  connect
  cleanup
end

Instance Method Details

#connectObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 32

def connect
  @redis = ConnectionPool.new(**@config[:pool]) {
    ::Redis.new(@config[:connection])
  }

  @buffer = Buffer.new(@redis, pubsub_channel)
  @subscriber = Subscriber.new(::Redis.new(@config[:connection]), pubsub_channel) do |payload|
    channel, message = Marshal.restore(payload).values_at(:channel, :message)
    @server.transmit_message_to_connection_ids(message, socket_ids_for_channel(channel), raw: true)
  end
end

#current!(socket_id, socket_instance_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



120
121
122
123
124
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 120

def current!(socket_id, socket_instance_id)
  @redis.with do |redis|
    redis.set(key_socket_instance_id_by_socket_id(socket_id), socket_instance_id)
  end
end

#current?(socket_id, socket_instance_id) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


126
127
128
129
130
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 126

def current?(socket_id, socket_instance_id)
  @redis.with do |redis|
    redis.get(key_socket_instance_id_by_socket_id(socket_id)) == socket_instance_id.to_s
  end
end

#disconnectObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



44
45
46
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 44

def disconnect
  @subscriber.disconnect
end

#expire(socket_id, seconds) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 89

def expire(socket_id, seconds)
  time_expire = Time.now.to_i + seconds
  channels = channels_for_socket_id(socket_id)

  @redis.with do |redis|
    redis.multi do |transaction|
      channels.each do |channel|
        transaction.zadd(key_socket_ids_by_channel(channel), time_expire, socket_id)
      end

      transaction.expireat(key_channels_by_socket_id(socket_id), time_expire + 1)
      transaction.expireat(key_socket_instance_id_by_socket_id(socket_id), time_expire + 1)
    end
  end
end

#persist(socket_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 105

def persist(socket_id)
  channels = channels_for_socket_id(socket_id)

  @redis.with do |redis|
    redis.multi do |transaction|
      channels.each do |channel|
        transaction.zadd(key_socket_ids_by_channel(channel), INFINITY, socket_id)
      end

      transaction.persist(key_channels_by_socket_id(socket_id))
      transaction.persist(key_socket_instance_id_by_socket_id(socket_id))
    end
  end
end

#socket_subscribe(socket_id, *channels) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 48

def socket_subscribe(socket_id, *channels)
  @redis.with do |redis|
    redis.multi do |transaction|
      channels.each do |channel|
        channel = channel.to_s
        transaction.zadd(key_socket_ids_by_channel(channel), INFINITY, socket_id)
        transaction.zadd(key_channels_by_socket_id(socket_id), INFINITY, channel)
      end
    end
  end
end

#socket_unsubscribe(*channels) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 60

def socket_unsubscribe(*channels)
  @redis.with do |redis|
    channels.each do |channel|
      channel = channel.to_s

      # Channel could contain a wildcard, so this takes some work...
      redis.scan_each(match: key_socket_ids_by_channel(channel)) do |key|
        channel = key.split("channel:", 2)[1]

        socket_ids = redis.zrangebyscore(
          key, Time.now.to_i, INFINITY
        )

        redis.multi do |transaction|
          transaction.del(key)

          socket_ids.each do |socket_id|
            transaction.zrem(key_channels_by_socket_id(socket_id), channel)
          end
        end
      end
    end
  end
end

#subscription_broadcast(channel, message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



85
86
87
# File 'lib/pakyow/realtime/server/adapters/redis.rb', line 85

def subscription_broadcast(channel, message)
  @buffer << Marshal.dump(channel: channel, message: { payload: message }.to_json)
end