Class: ActionCable::Server::Base

Inherits:
Object
  • Object
show all
Includes:
Broadcasting, Connections
Defined in:
lib/action_cable/server/base.rb

Overview

# Action Cable Server Base

A singleton ActionCable::Server instance is available via ActionCable.server. It’s used by the Rack process that starts the Action Cable server, but is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers.

Also, this is the server instance used for broadcasting. See Broadcasting for more information.

Constant Summary

Constants included from Connections

Connections::BEAT_INTERVAL

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Connections

#add_connection, #connections, #open_connections_statistics, #remove_connection, #setup_heartbeat_timer

Methods included from Broadcasting

#broadcast, #broadcaster_for

Constructor Details

#initialize(config: self.class.config) ⇒ Base

Returns a new instance of Base.



54
55
56
57
58
# File 'lib/action_cable/server/base.rb', line 54

def initialize(config: self.class.config)
  @config = config
  @mutex = Monitor.new
  @remote_connections = @event_loop = @worker_pool = @executor = @pubsub = nil
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



47
48
49
# File 'lib/action_cable/server/base.rb', line 47

def config
  @config
end

#mutexObject (readonly)

Returns the value of attribute mutex.



52
53
54
# File 'lib/action_cable/server/base.rb', line 52

def mutex
  @mutex
end

Class Method Details

.loggerObject



49
# File 'lib/action_cable/server/base.rb', line 49

def self.logger; config.logger; end

Instance Method Details

#allow_request_origin?(env) ⇒ Boolean

Check if the request origin is allowed to connect to the Action Cable server.

Returns:

  • (Boolean)


146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/action_cable/server/base.rb', line 146

def allow_request_origin?(env)
  return true if config.disable_request_forgery_protection

  proto = Rack::Request.new(env).ssl? ? "https" : "http"
  if config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
    true
  elsif Array(config.allowed_request_origins).any? { |allowed_origin|  allowed_origin === env["HTTP_ORIGIN"] }
    true
  else
    logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
    false
  end
end

#call(env) ⇒ Object

Called by Rack to set up the server.



61
62
63
64
65
# File 'lib/action_cable/server/base.rb', line 61

def call(env)
  return config.health_check_application.call(env) if env["PATH_INFO"] == config.health_check_path
  setup_heartbeat_timer
  Socket.new(self, env).process
end

#connection_identifiersObject

All of the identifiers applied to the connection class associated with this server.



134
135
136
# File 'lib/action_cable/server/base.rb', line 134

def connection_identifiers
  config.connection_class.call.identifiers
end

#disconnect(identifiers) ⇒ Object

Disconnect all the connections identified by ‘identifiers` on this server or any others via RemoteConnections.



69
70
71
# File 'lib/action_cable/server/base.rb', line 69

def disconnect(identifiers)
  remote_connections.where(identifiers).disconnect
end

#event_loopObject



98
99
100
# File 'lib/action_cable/server/base.rb', line 98

def event_loop
  @event_loop || @mutex.synchronize { @event_loop ||= StreamEventLoop.new }
end

#executorObject

Executor is used by various actions within Action Cable (e.g., pub/sub operations) to run code asynchronously.



123
124
125
# File 'lib/action_cable/server/base.rb', line 123

def executor
  @executor || @mutex.synchronize { @executor ||= ThreadedExecutor.new(max_size: config.executor_pool_size) }
end

#new_tagged_logger(request = nil, &block) ⇒ Object

Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. You can pass request object either directly or via block to lazily evaluate it.



140
141
142
143
# File 'lib/action_cable/server/base.rb', line 140

def new_tagged_logger(request = nil, &block)
  TaggedLoggerProxy.new logger,
    tags: config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request ||= block.call) : tag.to_s.camelize }
end

#pubsubObject

Adapter used for all streams/broadcasting.



128
129
130
# File 'lib/action_cable/server/base.rb', line 128

def pubsub
  @pubsub || (executor && @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) })
end

#remote_connectionsObject

Gateway to RemoteConnections. See that class for details.



94
95
96
# File 'lib/action_cable/server/base.rb', line 94

def remote_connections
  @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end

#restartObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/action_cable/server/base.rb', line 73

def restart
  connections.each do |connection|
    connection.close(reason: ActionCable::INTERNAL[:disconnect_reasons][:server_restart])
  end

  @mutex.synchronize do
    # Shutdown the worker pool
    @worker_pool.halt if @worker_pool
    @worker_pool = nil

    # Shutdown the executor
    @executor.shutdown if @executor
    @executor = nil

    # Shutdown the pub/sub adapter
    @pubsub.shutdown if @pubsub
    @pubsub = nil
  end
end

#worker_poolObject

The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server’s main thread. The worker pool is an executor service that’s backed by a pool of threads working from a task queue. The thread pool size maxes out at 4 worker threads by default. Tune the size yourself with ‘config.action_cable.worker_pool_size`.

Using Active Record, Redis, etc within your channel actions means you’ll get a separate connection from each thread in the worker pool. Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database connections.

Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe the database connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger database connection pool instead.



118
119
120
# File 'lib/action_cable/server/base.rb', line 118

def worker_pool
  @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end