Class: Vines::Cluster
- Inherits:
-
Object
- Object
- Vines::Cluster
- Includes:
- Log
- Defined in:
- lib/vines/cluster.rb,
lib/vines/cluster/pubsub.rb,
lib/vines/cluster/sessions.rb,
lib/vines/cluster/publisher.rb,
lib/vines/cluster/connection.rb,
lib/vines/cluster/subscriber.rb
Overview
Server instances may be connected to one another in a cluster so they can host a single chat domain, or set of domains, across many servers, transparently to users. A redis database is used for the session routing table, mapping JIDs to their node’s location. Redis pubsub channels are used to communicate amongst nodes.
Using a shared in-memory cache, like redis, rather than synchronizing the cache to each node, allows us to add cluster nodes dynamically, without updating all other nodes’ config files. It also greatly reduces the amount of memory required by the chat server processes.
Defined Under Namespace
Classes: Connection, PubSub, Publisher, Sessions, StreamProxy, Subscriber, UserProxy
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
-
#add_pubsub_node(domain, node) ⇒ Object
Create a pubsub topic (a.k.a. node), in the given domain, to which messages may be published.
-
#connect ⇒ Object
Create a new redis connection.
-
#connected_resources(jid) ⇒ Object
Return the connected streams for this user, without any proxy streams to remote cluster nodes (locally connected streams only).
-
#connection ⇒ Object
Return the shared redis connection for most queries to use.
-
#delete_pubsub_node(domain, node) ⇒ Object
Remove a pubsub topic so messages may no longer be broadcast to it.
-
#delete_session(jid) ⇒ Object
Remove this user from the cluster routing table so that no further stanzas may be routed to them.
-
#delete_sessions(node) ⇒ Object
Remove all user sessions from the routing table associated with the given node ID.
-
#initialize(config, &block) ⇒ Cluster
constructor
A new instance of Cluster.
-
#poke(node, time) ⇒ Object
Notify the session store that this node is still alive.
-
#pubsub_node?(domain, node) ⇒ Boolean
Return true if the pubsub topic exists and messages may be published to it.
-
#pubsub_subscribed?(domain, node, jid) ⇒ Boolean
Return true if the JID is a registered subscriber to the pubsub topic and messages published to it should be routed to the JID.
-
#pubsub_subscribers(domain, node) ⇒ Object
Return a list of JIDs subscribed to the pubsub topic.
-
#query(name, *args) ⇒ Object
Turn an asynchronous redis query into a blocking call by pausing the fiber in which this code is running.
-
#remote_sessions(*jids) ⇒ Object
Returns any streams hosted at remote nodes for these JIDs.
-
#route(stanza, node) ⇒ Object
Send the stanza to the node hosting the user’s session.
-
#save_session(jid, attrs) ⇒ Object
Persist the user’s session to the shared redis cache so that other cluster nodes can locate the node hosting this user’s connection and route messages to them.
-
#start ⇒ Object
Join this node to the cluster by broadcasting its state to the other nodes, subscribing to redis channels, and scheduling periodic heartbeat broadcasts.
-
#storage(domain) ⇒ Object
Return the Storage implementation for this domain or nil if the domain is not hosted here.
-
#subscribe_pubsub(domain, node, jid) ⇒ Object
Subscribe the JID to the pubsub topic so it will receive any messages published to it.
-
#unsubscribe_all_pubsub(domain, jid) ⇒ Object
Unsubscribe the JID from all pubsub topics.
-
#unsubscribe_pubsub(domain, node, jid) ⇒ Object
Unsubscribe the JID from the pubsub topic, deregistering its interest in receiving any messages published to it.
-
#update_user(jid, node) ⇒ Object
Notify the remote node that the user’s roster has changed and it should reload the user from storage.
Methods included from Log
Constructor Details
#initialize(config, &block) ⇒ Cluster
Returns a new instance of Cluster.
29 30 31 32 33 34 35 36 37 |
# File 'lib/vines/cluster.rb', line 29 def initialize(config, &block) @config, @id = config, Kit.uuid @connection = Connection.new @sessions = Sessions.new(self) @publisher = Publisher.new(self) @subscriber = Subscriber.new(self) @pubsub = PubSub.new(self) instance_eval(&block) end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
17 18 19 |
# File 'lib/vines/cluster.rb', line 17 def id @id end |
Instance Method Details
#add_pubsub_node(domain, node) ⇒ Object
Create a pubsub topic (a.k.a. node), in the given domain, to which messages may be published. The domain argument will be one of the configured pubsub subdomains in conf/config.rb (e.g. games.wonderland.lit, topics.wonderland.lit, etc).
144 145 146 |
# File 'lib/vines/cluster.rb', line 144 def add_pubsub_node(domain, node) @pubsub.add_node(domain, node) end |
#connect ⇒ Object
Create a new redis connection.
113 114 115 |
# File 'lib/vines/cluster.rb', line 113 def connect @connection.create end |
#connected_resources(jid) ⇒ Object
Return the connected streams for this user, without any proxy streams to remote cluster nodes (locally connected streams only).
130 131 132 |
# File 'lib/vines/cluster.rb', line 130 def connected_resources(jid) @config.router.connected_resources(jid, jid, false) end |
#connection ⇒ Object
Return the shared redis connection for most queries to use.
108 109 110 |
# File 'lib/vines/cluster.rb', line 108 def connection @connection.connect end |
#delete_pubsub_node(domain, node) ⇒ Object
Remove a pubsub topic so messages may no longer be broadcast to it.
149 150 151 |
# File 'lib/vines/cluster.rb', line 149 def delete_pubsub_node(domain, node) @pubsub.delete_node(domain, node) end |
#delete_session(jid) ⇒ Object
Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user’s session is terminated, either by logout or stream disconnect.
75 76 77 |
# File 'lib/vines/cluster.rb', line 75 def delete_session(jid) @sessions.delete(jid) end |
#delete_sessions(node) ⇒ Object
Remove all user sessions from the routing table associated with the given node ID. Cluster nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.
83 84 85 |
# File 'lib/vines/cluster.rb', line 83 def delete_sessions(node) @sessions.delete_all(node) end |
#poke(node, time) ⇒ Object
Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members’ clocks don’t necessarily need to be in sync.
90 91 92 |
# File 'lib/vines/cluster.rb', line 90 def poke(node, time) @sessions.poke(node, time) end |
#pubsub_node?(domain, node) ⇒ Boolean
Return true if the pubsub topic exists and messages may be published to it.
172 173 174 |
# File 'lib/vines/cluster.rb', line 172 def pubsub_node?(domain, node) @pubsub.node?(domain, node) end |
#pubsub_subscribed?(domain, node, jid) ⇒ Boolean
Return true if the JID is a registered subscriber to the pubsub topic and messages published to it should be routed to the JID.
178 179 180 |
# File 'lib/vines/cluster.rb', line 178 def pubsub_subscribed?(domain, node, jid) @pubsub.subscribed?(domain, node, jid) end |
#pubsub_subscribers(domain, node) ⇒ Object
Return a list of JIDs subscribed to the pubsub topic.
183 184 185 |
# File 'lib/vines/cluster.rb', line 183 def pubsub_subscribers(domain, node) @pubsub.subscribers(domain, node) end |
#query(name, *args) ⇒ Object
Turn an asynchronous redis query into a blocking call by pausing the fiber in which this code is running. Return the result of the query from this method, rather than passing it to a callback block.
120 121 122 123 124 125 126 |
# File 'lib/vines/cluster.rb', line 120 def query(name, *args) fiber, yielding = Fiber.current, true req = connection.send(name, *args) req.errback { fiber.resume rescue yielding = false } req.callback {|response| fiber.resume(response) } Fiber.yield if yielding end |
#remote_sessions(*jids) ⇒ Object
Returns any streams hosted at remote nodes for these JIDs. The streams act like normal EM::Connections, but are actually proxies that route stanzas over redis pubsub channels to remote nodes.
59 60 61 62 63 |
# File 'lib/vines/cluster.rb', line 59 def remote_sessions(*jids) @sessions.find(*jids).map do |session| StreamProxy.new(self, session) end end |
#route(stanza, node) ⇒ Object
Send the stanza to the node hosting the user’s session. The stanza is published to the channel to which the remote node is listening for messages.
97 98 99 |
# File 'lib/vines/cluster.rb', line 97 def route(stanza, node) @publisher.route(stanza, node) end |
#save_session(jid, attrs) ⇒ Object
Persist the user’s session to the shared redis cache so that other cluster nodes can locate the node hosting this user’s connection and route messages to them.
68 69 70 |
# File 'lib/vines/cluster.rb', line 68 def save_session(jid, attrs) @sessions.save(jid, attrs) end |
#start ⇒ Object
Join this node to the cluster by broadcasting its state to the other nodes, subscribing to redis channels, and scheduling periodic heartbeat broadcasts. This method must be called after initialize or this node will not be a cluster member.
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/vines/cluster.rb', line 43 def start @connection.connect @publisher.broadcast(:online) @subscriber.subscribe EM.add_periodic_timer(1) { heartbeat } at_exit do @publisher.broadcast(:offline) @sessions.delete_all(@id) end end |
#storage(domain) ⇒ Object
Return the Storage implementation for this domain or nil if the domain is not hosted here.
136 137 138 |
# File 'lib/vines/cluster.rb', line 136 def storage(domain) @config.storage(domain) end |
#subscribe_pubsub(domain, node, jid) ⇒ Object
Subscribe the JID to the pubsub topic so it will receive any messages published to it.
155 156 157 |
# File 'lib/vines/cluster.rb', line 155 def subscribe_pubsub(domain, node, jid) @pubsub.subscribe(domain, node, jid) end |
#unsubscribe_all_pubsub(domain, jid) ⇒ Object
Unsubscribe the JID from all pubsub topics. This is useful when the JID’s session ends by logout or disconnect.
167 168 169 |
# File 'lib/vines/cluster.rb', line 167 def unsubscribe_all_pubsub(domain, jid) @pubsub.unsubscribe_all(domain, jid) end |
#unsubscribe_pubsub(domain, node, jid) ⇒ Object
Unsubscribe the JID from the pubsub topic, deregistering its interest in receiving any messages published to it.
161 162 163 |
# File 'lib/vines/cluster.rb', line 161 def unsubscribe_pubsub(domain, node, jid) @pubsub.unsubscribe(domain, node, jid) end |
#update_user(jid, node) ⇒ Object
Notify the remote node that the user’s roster has changed and it should reload the user from storage.
103 104 105 |
# File 'lib/vines/cluster.rb', line 103 def update_user(jid, node) @publisher.update_user(jid, node) end |