Class: ActionCable::SubscriptionAdapter::EnhancedPostgresql

Inherits:
PostgreSQL
  • Object
show all
Defined in:
lib/action_cable/subscription_adapter/enhanced_postgresql.rb,
lib/railtie.rb

Defined Under Namespace

Classes: Listener, Railtie

Constant Summary collapse

MAX_NOTIFY_SIZE =

documented as 8000 bytes, but there appears to be some overhead in transit

7997
LARGE_PAYLOAD_PREFIX =
"__large_payload:"
INSERTS_PER_DELETE =

execute DELETE query every N inserts

100
LARGE_PAYLOADS_TABLE =
"action_cable_large_payloads"
CREATE_LARGE_TABLE_QUERY =
<<~SQL
  CREATE UNLOGGED TABLE IF NOT EXISTS #{LARGE_PAYLOADS_TABLE} (
    id SERIAL PRIMARY KEY,
    payload TEXT NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
  )
SQL
CREATE_CREATED_AT_INDEX_QUERY =
<<~SQL
  CREATE INDEX IF NOT EXISTS index_action_cable_large_payloads_on_created_at
  ON #{LARGE_PAYLOADS_TABLE} (created_at)
SQL
INSERT_LARGE_PAYLOAD_QUERY =
"INSERT INTO #{LARGE_PAYLOADS_TABLE} (payload, created_at) VALUES ($1, CURRENT_TIMESTAMP) RETURNING id"
SELECT_LARGE_PAYLOAD_QUERY =
"SELECT payload FROM #{LARGE_PAYLOADS_TABLE} WHERE id = $1"
DELETE_LARGE_PAYLOAD_QUERY =
"DELETE FROM #{LARGE_PAYLOADS_TABLE} WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '2 minutes'"

Instance Method Summary collapse

Constructor Details

#initializeEnhancedPostgresql

Returns a new instance of EnhancedPostgresql.



29
30
31
32
33
34
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 29

def initialize(*)
  super

  @url = @server.config.cable[:url]
  @connection_pool_size = @server.config.cable[:connection_pool_size] || ENV["RAILS_MAX_THREADS"] || 5
end

Instance Method Details

#broadcast(channel, payload) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 36

def broadcast(channel, payload)
  channel = channel_with_prefix(channel)

  with_broadcast_connection do |pg_conn|
    channel = pg_conn.escape_identifier(channel_identifier(channel))
    payload = pg_conn.escape_string(payload)

    if payload.bytesize > MAX_NOTIFY_SIZE
      payload_id = insert_large_payload(pg_conn, payload)

      if payload_id % INSERTS_PER_DELETE == 0
        pg_conn.exec(DELETE_LARGE_PAYLOAD_QUERY)
      end

      # Encrypt payload_id to prevent simple integer ID spoofing
      encrypted_payload_id = payload_encryptor.encrypt_and_sign(payload_id)

      payload = "#{LARGE_PAYLOAD_PREFIX}#{encrypted_payload_id}"
    end

    pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
  end
end

#payload_encryptorObject



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 60

def payload_encryptor
  @payload_encryptor ||= begin
    secret = @server.config.cable[:payload_encryptor_secret]
    secret ||= Rails.application.secret_key_base if defined? Rails
    secret ||= ENV["SECRET_KEY_BASE"]

    raise ArgumentError, "Missing payload_encryptor_secret configuration for ActionCable EnhancedPostgresql adapter. You need to either explicitly configure it in cable.yml or set the SECRET_KEY_BASE environment variable." unless secret

    secret_32_byte = Digest::SHA256.digest(secret)
    ActiveSupport::MessageEncryptor.new(secret_32_byte)
  end
end

#with_broadcast_connection(&block) ⇒ Object



73
74
75
76
77
78
79
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 73

def with_broadcast_connection(&block)
  return super unless @url

  connection_pool.with do |pg_conn|
    yield pg_conn
  end
end

#with_subscriptions_connection(&block) ⇒ Object

Called from the Listener thread



82
83
84
85
86
87
88
89
90
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 82

def with_subscriptions_connection(&block)
  return super unless @url

  pg_conn = PG::Connection.new(@url)
  pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(identifier)}")
  yield pg_conn
ensure
  pg_conn&.close
end