Class: ActionCable::Connection::Base
- Defined in:
- actioncable/lib/action_cable/connection/base.rb
Overview
Action Cable Connection Base
For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond authentication and authorization.
Here’s a basic example:
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :current_user
def connect
self.current_user = find_verified_user
logger. current_user.name
end
def disconnect
# Any cleanup work needed when the cable connection is cut.
end
private
def find_verified_user
User.find_by_identity(.encrypted[:identity_id]) ||
end
end
end
First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections established for that current_user (and potentially disconnect them). You can declare as many identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
Pretty simple, eh?
Constant Summary
Constants included from ActiveSupport::Callbacks
ActiveSupport::Callbacks::CALLBACK_FILTER_TYPES
Instance Attribute Summary collapse
-
#env ⇒ Object
readonly
Returns the value of attribute env.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#protocol ⇒ Object
readonly
Returns the value of attribute protocol.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
-
#worker_pool ⇒ Object
readonly
Returns the value of attribute worker_pool.
Instance Method Summary collapse
- #beat ⇒ Object
-
#close(reason: nil, reconnect: true) ⇒ Object
Close the WebSocket connection.
-
#dispatch_websocket_message(websocket_message) ⇒ Object
:nodoc:.
- #handle_channel_command(payload) ⇒ Object
-
#initialize(server, env, coder: ActiveSupport::JSON) ⇒ Base
constructor
A new instance of Base.
-
#inspect ⇒ Object
:nodoc:.
-
#on_close(reason, code) ⇒ Object
:nodoc:.
-
#on_error(message) ⇒ Object
:nodoc:.
-
#on_message(message) ⇒ Object
:nodoc:.
-
#on_open ⇒ Object
:nodoc:.
-
#process ⇒ Object
Called by the server when a new WebSocket connection is established.
-
#receive(websocket_message) ⇒ Object
Decodes WebSocket messages and dispatches them to subscribed channels.
-
#send_async(method, *arguments) ⇒ Object
Invoke a method on the connection asynchronously through the pool of thread workers.
-
#statistics ⇒ Object
Return a basic hash of statistics for the connection keyed with
identifier
,started_at
,subscriptions
, andrequest_id
. -
#transmit(cable_message) ⇒ Object
:nodoc:.
Methods included from ActiveSupport::Rescuable
#handler_for_rescue, #rescue_with_handler
Methods included from ActiveSupport::Concern
#append_features, #class_methods, extended, #included, #prepend_features, #prepended
Methods included from ActiveSupport::Callbacks
Methods included from Authorization
#reject_unauthorized_connection
Methods included from Identification
Constructor Details
#initialize(server, env, coder: ActiveSupport::JSON) ⇒ Base
Returns a new instance of Base.
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 58 def initialize(server, env, coder: ActiveSupport::JSON) @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @_internal_subscriptions = nil @started_at = Time.now end |
Instance Attribute Details
#env ⇒ Object (readonly)
Returns the value of attribute env
55 56 57 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 55 def env @env end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger
55 56 57 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 55 def logger @logger end |
#protocol ⇒ Object (readonly)
Returns the value of attribute protocol
55 56 57 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 55 def protocol @protocol end |
#server ⇒ Object (readonly)
Returns the value of attribute server
55 56 57 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 55 def server @server end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions
55 56 57 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 55 def subscriptions @subscriptions end |
#worker_pool ⇒ Object (readonly)
Returns the value of attribute worker_pool
55 56 57 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 55 def worker_pool @worker_pool end |
Instance Method Details
#beat ⇒ Object
134 135 136 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 134 def beat transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end |
#close(reason: nil, reconnect: true) ⇒ Object
Close the WebSocket connection.
109 110 111 112 113 114 115 116 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 109 def close(reason: nil, reconnect: true) transmit( type: ActionCable::INTERNAL[:message_types][:disconnect], reason: reason, reconnect: reconnect ) websocket.close end |
#dispatch_websocket_message(websocket_message) ⇒ Object
:nodoc:
90 91 92 93 94 95 96 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 90 def () # :nodoc: if websocket.alive? handle_channel_command decode() else logger.error "Ignoring message processed after the WebSocket was closed: #{.inspect})" end end |
#handle_channel_command(payload) ⇒ Object
98 99 100 101 102 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 98 def handle_channel_command(payload) run_callbacks :command do subscriptions.execute_command payload end end |
#inspect ⇒ Object
:nodoc:
155 156 157 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 155 def inspect # :nodoc: "#<#{self.class.name}:#{'%#016x' % (object_id << 1)}>" end |
#on_close(reason, code) ⇒ Object
:nodoc:
151 152 153 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 151 def on_close(reason, code) # :nodoc: send_async :handle_close end |
#on_error(message) ⇒ Object
:nodoc:
146 147 148 149 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 146 def on_error() # :nodoc: # log errors to make diagnosing socket errors easier logger.error "WebSocket error occurred: #{}" end |
#on_message(message) ⇒ Object
:nodoc:
142 143 144 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 142 def () # :nodoc: .append end |
#on_open ⇒ Object
:nodoc:
138 139 140 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 138 def on_open # :nodoc: send_async :handle_open end |
#process ⇒ Object
Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. This method should not be called directly – instead rely upon on the #connect (and #disconnect) callbacks.
74 75 76 77 78 79 80 81 82 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 74 def process # :nodoc: logger.info if websocket.possible? && allow_request_origin? respond_to_successful_request else respond_to_invalid_request end end |
#receive(websocket_message) ⇒ Object
Decodes WebSocket messages and dispatches them to subscribed channels. WebSocket message transfer encoding is always JSON.
86 87 88 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 86 def receive() # :nodoc: send_async :dispatch_websocket_message, end |
#send_async(method, *arguments) ⇒ Object
Invoke a method on the connection asynchronously through the pool of thread workers.
119 120 121 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 119 def send_async(method, *arguments) worker_pool.async_invoke(self, method, *arguments) end |
#statistics ⇒ Object
Return a basic hash of statistics for the connection keyed with identifier
, started_at
, subscriptions
, and request_id
. This can be returned by a health check against the connection.
125 126 127 128 129 130 131 132 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 125 def statistics { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers, request_id: @env["action_dispatch.request_id"] } end |
#transmit(cable_message) ⇒ Object
:nodoc:
104 105 106 |
# File 'actioncable/lib/action_cable/connection/base.rb', line 104 def transmit() # :nodoc: websocket.transmit encode() end |