Class: ActionCable::Server::Base
- Inherits:
-
Object
- Object
- ActionCable::Server::Base
- Includes:
- Broadcasting, Connections
- Defined in:
- lib/action_cable/server/base.rb
Overview
# Action Cable Server Base
A singleton ActionCable::Server instance is available via ActionCable.server. It’s used by the Rack process that starts the Action Cable server, but is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers.
Also, this is the server instance used for broadcasting. See Broadcasting for more information.
Constant Summary
Constants included from Connections
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#mutex ⇒ Object
readonly
Returns the value of attribute mutex.
Class Method Summary collapse
Instance Method Summary collapse
-
#allow_request_origin?(env) ⇒ Boolean
Check if the request origin is allowed to connect to the Action Cable server.
-
#call(env) ⇒ Object
Called by Rack to set up the server.
-
#connection_identifiers ⇒ Object
All of the identifiers applied to the connection class associated with this server.
-
#disconnect(identifiers) ⇒ Object
Disconnect all the connections identified by ‘identifiers` on this server or any others via RemoteConnections.
- #event_loop ⇒ Object
-
#executor ⇒ Object
Executor is used by various actions within Action Cable (e.g., pub/sub operations) to run code asynchronously.
-
#initialize(config: self.class.config) ⇒ Base
constructor
A new instance of Base.
-
#new_tagged_logger(request = nil, &block) ⇒ Object
Tags are declared in the server but computed in the connection.
-
#pubsub ⇒ Object
Adapter used for all streams/broadcasting.
-
#remote_connections ⇒ Object
Gateway to RemoteConnections.
- #restart ⇒ Object
-
#worker_pool ⇒ Object
The worker pool is where we run connection callbacks and channel actions.
Methods included from Connections
#add_connection, #connections, #open_connections_statistics, #remove_connection, #setup_heartbeat_timer
Methods included from Broadcasting
Constructor Details
#initialize(config: self.class.config) ⇒ Base
Returns a new instance of Base.
54 55 56 57 58 |
# File 'lib/action_cable/server/base.rb', line 54 def initialize(config: self.class.config) @config = config @mutex = Monitor.new @remote_connections = @event_loop = @worker_pool = @executor = @pubsub = nil end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
47 48 49 |
# File 'lib/action_cable/server/base.rb', line 47 def config @config end |
#mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
52 53 54 |
# File 'lib/action_cable/server/base.rb', line 52 def mutex @mutex end |
Class Method Details
.logger ⇒ Object
49 |
# File 'lib/action_cable/server/base.rb', line 49 def self.logger; config.logger; end |
Instance Method Details
#allow_request_origin?(env) ⇒ Boolean
Check if the request origin is allowed to connect to the Action Cable server.
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/action_cable/server/base.rb', line 146 def allow_request_origin?(env) return true if config.disable_request_forgery_protection proto = Rack::Request.new(env).ssl? ? "https" : "http" if config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}" true elsif Array(config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] } true else logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") false end end |
#call(env) ⇒ Object
Called by Rack to set up the server.
61 62 63 64 65 |
# File 'lib/action_cable/server/base.rb', line 61 def call(env) return config.health_check_application.call(env) if env["PATH_INFO"] == config.health_check_path setup_heartbeat_timer Socket.new(self, env).process end |
#connection_identifiers ⇒ Object
All of the identifiers applied to the connection class associated with this server.
134 135 136 |
# File 'lib/action_cable/server/base.rb', line 134 def connection_identifiers config.connection_class.call.identifiers end |
#disconnect(identifiers) ⇒ Object
Disconnect all the connections identified by ‘identifiers` on this server or any others via RemoteConnections.
69 70 71 |
# File 'lib/action_cable/server/base.rb', line 69 def disconnect(identifiers) remote_connections.where(identifiers).disconnect end |
#event_loop ⇒ Object
98 99 100 |
# File 'lib/action_cable/server/base.rb', line 98 def event_loop @event_loop || @mutex.synchronize { @event_loop ||= StreamEventLoop.new } end |
#executor ⇒ Object
Executor is used by various actions within Action Cable (e.g., pub/sub operations) to run code asynchronously.
123 124 125 |
# File 'lib/action_cable/server/base.rb', line 123 def executor @executor || @mutex.synchronize { @executor ||= ThreadedExecutor.new(max_size: config.executor_pool_size) } end |
#new_tagged_logger(request = nil, &block) ⇒ Object
Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. You can pass request object either directly or via block to lazily evaluate it.
140 141 142 143 |
# File 'lib/action_cable/server/base.rb', line 140 def new_tagged_logger(request = nil, &block) TaggedLoggerProxy.new logger, tags: config..map { |tag| tag.respond_to?(:call) ? tag.call(request ||= block.call) : tag.to_s.camelize } end |
#pubsub ⇒ Object
Adapter used for all streams/broadcasting.
128 129 130 |
# File 'lib/action_cable/server/base.rb', line 128 def pubsub @pubsub || (executor && @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }) end |
#remote_connections ⇒ Object
Gateway to RemoteConnections. See that class for details.
94 95 96 |
# File 'lib/action_cable/server/base.rb', line 94 def remote_connections @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end |
#restart ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/action_cable/server/base.rb', line 73 def restart connections.each do |connection| connection.close(reason: ActionCable::INTERNAL[:disconnect_reasons][:server_restart]) end @mutex.synchronize do # Shutdown the worker pool @worker_pool.halt if @worker_pool @worker_pool = nil # Shutdown the executor @executor.shutdown if @executor @executor = nil # Shutdown the pub/sub adapter @pubsub.shutdown if @pubsub @pubsub = nil end end |
#worker_pool ⇒ Object
The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server’s main thread. The worker pool is an executor service that’s backed by a pool of threads working from a task queue. The thread pool size maxes out at 4 worker threads by default. Tune the size yourself with ‘config.action_cable.worker_pool_size`.
Using Active Record, Redis, etc within your channel actions means you’ll get a separate connection from each thread in the worker pool. Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database connections.
Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger database connection pool instead.
118 119 120 |
# File 'lib/action_cable/server/base.rb', line 118 def worker_pool @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end |