Class: Rage::Cable::Protocol::ActioncableV1Json
- Inherits:
-
Object
- Object
- Rage::Cable::Protocol::ActioncableV1Json
- Defined in:
- lib/rage/cable/protocol/actioncable_v1_json.rb
Overview
A protocol defines the structure, rules and semantics for exchanging data between the client and the server. The class that defines a protocol should respond to the following methods:
protocol_definition
init
on_open
on_message
serialize
subscribe
broadcast
The two optional methods are:
on_shutdown
on_close
Defined Under Namespace
Modules: COMMAND, MESSAGES, REASON, TYPE
Constant Summary collapse
- HANDSHAKE_HEADERS =
{ "Sec-WebSocket-Protocol" => "actioncable-v1-json" }
Class Method Summary collapse
-
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
-
.init(router) ⇒ Object
This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).
-
.on_close(connection) ⇒ Object
The method should process client disconnections and call Router#process_message.
-
.on_message(connection, raw_data) ⇒ Object
The method processes messages from existing connections.
-
.on_open(connection) ⇒ Object
The method is called any time a new WebSocket connection is established.
-
.protocol_definition ⇒ Object
The method defines the headers to send to the client after the handshake process.
-
.serialize(params, data) ⇒ Object
Serialize a Ruby object into the format the client would understand.
-
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
Class Method Details
.broadcast(name, data) ⇒ Object
Broadcast data to all clients connected to a stream.
158 159 160 161 162 163 164 165 166 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 158 def self.broadcast(name, data) i, identifiers = 0, @subscription_identifiers[name] while i < identifiers.length params = identifiers[i] ::Iodine.publish("cable:#{name}:#{params.hash}", serialize(params, data)) i += 1 end end |
.init(router) ⇒ Object
This method serves as a constructor to prepare the object or set up recurring tasks (e.g. heartbeats).
55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 55 def self.init(router) @router = router ping_counter = Time.now.to_i ::Iodine.run_every(3000) do ping_counter += 1 ::Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json) end # Hash<String(stream name) => Array<Hash>(subscription params)> @subscription_identifiers = Hash.new { |hash, key| hash[key] = [] } end |
.on_close(connection) ⇒ Object
This method is optional.
The method should process client disconnections and call Router#process_message.
132 133 134 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 132 def self.on_close(connection) @router.process_disconnection(connection) end |
.on_message(connection, raw_data) ⇒ Object
The method processes messages from existing connections. It should parse the message, call either Router#process_subscription or Router#process_message, and handle its return value.
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 91 def self.(connection, raw_data) parsed_data = Rage::ParamsParser.json_parse(raw_data) command, identifier = parsed_data[:command], parsed_data[:identifier] params = Rage::ParamsParser.json_parse(identifier) # process subscription messages if command == COMMAND::SUBSCRIBE status = @router.process_subscription(connection, identifier, params[:channel], params) if status == :subscribed connection.write({ identifier: identifier, type: TYPE::CONFIRM }.to_json) elsif status == :rejected connection.write({ identifier: identifier, type: TYPE::REJECT }.to_json) elsif status == :invalid connection.write(MESSAGES::INVALID) end return end # process data messages; # plain `JSON` is used here to conform with the ActionCable API that passes `data` as a Hash with string keys; data = JSON.parse(parsed_data[:data]) = if command == COMMAND::MESSAGE && data.has_key?("action") @router.(connection, identifier, data["action"].to_sym, data) elsif command == COMMAND::MESSAGE @router.(connection, identifier, :receive, data) end unless == :processed connection.write(MESSAGES::INVALID) end end |
.on_open(connection) ⇒ Object
The method is called any time a new WebSocket connection is established. It is expected to call Router#process_connection and handle its return value.
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 73 def self.on_open(connection) accepted = @router.process_connection(connection) if accepted connection.subscribe("cable:ping") connection.write(MESSAGES::WELCOME) else connection.write(MESSAGES::UNAUTHORIZED) connection.close end end |
.protocol_definition ⇒ Object
The method defines the headers to send to the client after the handshake process.
48 49 50 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 48 def self.protocol_definition HANDSHAKE_HEADERS end |
.serialize(params, data) ⇒ Object
Serialize a Ruby object into the format the client would understand.
140 141 142 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 140 def self.serialize(params, data) { identifier: params.to_json, message: data }.to_json end |
.subscribe(connection, name, params) ⇒ Object
Subscribe to a stream.
149 150 151 152 |
# File 'lib/rage/cable/protocol/actioncable_v1_json.rb', line 149 def self.subscribe(connection, name, params) connection.subscribe("cable:#{name}:#{params.hash}") @subscription_identifiers[name] << params unless @subscription_identifiers[name].include?(params) end |