Class: ActionCable::SubscriptionAdapter::EnhancedPostgresql
- Inherits:
-
PostgreSQL
- Object
- PostgreSQL
- ActionCable::SubscriptionAdapter::EnhancedPostgresql
- Defined in:
- lib/action_cable/subscription_adapter/enhanced_postgresql.rb,
lib/railtie.rb
Defined Under Namespace
Constant Summary collapse
- MAX_NOTIFY_SIZE =
documented as 8000 bytes, but there appears to be some overhead in transit
7997
- LARGE_PAYLOAD_PREFIX =
"__large_payload:"
- INSERTS_PER_DELETE =
execute DELETE query every N inserts
100
- LARGE_PAYLOADS_TABLE =
"action_cable_large_payloads"
- CREATE_LARGE_TABLE_QUERY =
<<~SQL CREATE UNLOGGED TABLE IF NOT EXISTS #{LARGE_PAYLOADS_TABLE} ( id SERIAL PRIMARY KEY, payload TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ) SQL
- CREATE_CREATED_AT_INDEX_QUERY =
<<~SQL CREATE INDEX IF NOT EXISTS index_action_cable_large_payloads_on_created_at ON #{LARGE_PAYLOADS_TABLE} (created_at) SQL
- INSERT_LARGE_PAYLOAD_QUERY =
"INSERT INTO #{LARGE_PAYLOADS_TABLE} (payload, created_at) VALUES ($1, CURRENT_TIMESTAMP) RETURNING id"
- SELECT_LARGE_PAYLOAD_QUERY =
"SELECT payload FROM #{LARGE_PAYLOADS_TABLE} WHERE id = $1"
- DELETE_LARGE_PAYLOAD_QUERY =
"DELETE FROM #{LARGE_PAYLOADS_TABLE} WHERE created_at < CURRENT_TIMESTAMP - INTERVAL '2 minutes'"
Instance Method Summary collapse
- #broadcast(channel, payload) ⇒ Object
-
#initialize ⇒ EnhancedPostgresql
constructor
A new instance of EnhancedPostgresql.
- #payload_encryptor ⇒ Object
- #with_broadcast_connection(&block) ⇒ Object
-
#with_subscriptions_connection(&block) ⇒ Object
Called from the Listener thread.
Constructor Details
#initialize ⇒ EnhancedPostgresql
Returns a new instance of EnhancedPostgresql.
29 30 31 32 33 34 |
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 29 def initialize(*) super @url = @server.config.cable[:url] @connection_pool_size = @server.config.cable[:connection_pool_size] || ENV["RAILS_MAX_THREADS"] || 5 end |
Instance Method Details
#broadcast(channel, payload) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 36 def broadcast(channel, payload) channel = channel_with_prefix(channel) with_broadcast_connection do |pg_conn| channel = pg_conn.escape_identifier(channel_identifier(channel)) payload = pg_conn.escape_string(payload) if payload.bytesize > MAX_NOTIFY_SIZE payload_id = insert_large_payload(pg_conn, payload) if payload_id % INSERTS_PER_DELETE == 0 pg_conn.exec(DELETE_LARGE_PAYLOAD_QUERY) end # Encrypt payload_id to prevent simple integer ID spoofing encrypted_payload_id = payload_encryptor.encrypt_and_sign(payload_id) payload = "#{LARGE_PAYLOAD_PREFIX}#{encrypted_payload_id}" end pg_conn.exec("NOTIFY #{channel}, '#{payload}'") end end |
#payload_encryptor ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 60 def payload_encryptor @payload_encryptor ||= begin secret = @server.config.cable[:payload_encryptor_secret] secret ||= Rails.application.secret_key_base if defined? Rails secret ||= ENV["SECRET_KEY_BASE"] raise ArgumentError, "Missing payload_encryptor_secret configuration for ActionCable EnhancedPostgresql adapter. You need to either explicitly configure it in cable.yml or set the SECRET_KEY_BASE environment variable." unless secret secret_32_byte = Digest::SHA256.digest(secret) ActiveSupport::MessageEncryptor.new(secret_32_byte) end end |
#with_broadcast_connection(&block) ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 73 def with_broadcast_connection(&block) return super unless @url connection_pool.with do |pg_conn| yield pg_conn end end |
#with_subscriptions_connection(&block) ⇒ Object
Called from the Listener thread
82 83 84 85 86 87 88 89 90 |
# File 'lib/action_cable/subscription_adapter/enhanced_postgresql.rb', line 82 def with_subscriptions_connection(&block) return super unless @url pg_conn = PG::Connection.new(@url) pg_conn.exec("SET application_name = #{pg_conn.escape_identifier(identifier)}") yield pg_conn ensure pg_conn&.close end |