Class: Vines::Cluster::Sessions
- Inherits:
-
Object
- Object
- Vines::Cluster::Sessions
- Includes:
- Log
- Defined in:
- lib/vines/cluster/sessions.rb
Overview
Manages the cluster node list and user session routing table stored in redis. All cluster nodes share this in-memory database to quickly discover the node hosting a particular user session. Once a session is located, stanzas can be routed to that node via the Publisher
.
Constant Summary collapse
- NODES =
'cluster:nodes'.freeze
Instance Method Summary collapse
-
#delete(jid) ⇒ Object
Remove this user from the cluster routing table so that no further stanzas may be routed to them.
-
#delete_all(node) ⇒ Object
Remove all user sessions from the routing table associated with the given node ID.
-
#expire ⇒ Object
Cluster nodes broadcast a heartbeat to other members every second.
-
#find(*jids) ⇒ Object
Return the sessions for these JIDs.
-
#initialize(cluster) ⇒ Sessions
constructor
A new instance of Sessions.
-
#poke(node, time) ⇒ Object
Notify the session store that this node is still alive.
-
#save(jid, attrs) ⇒ Object
Persist the user’s session to the shared redis cache so that other cluster nodes can locate the node hosting this user’s connection and route messages to them.
Methods included from Log
Constructor Details
#initialize(cluster) ⇒ Sessions
Returns a new instance of Sessions.
14 15 16 |
# File 'lib/vines/cluster/sessions.rb', line 14 def initialize(cluster) @cluster, @nodes = cluster, {} end |
Instance Method Details
#delete(jid) ⇒ Object
Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user’s session is terminated, either by logout or stream disconnect.
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/vines/cluster/sessions.rb', line 43 def delete(jid) jid = JID.new(jid) redis.hget("sessions:#{jid.}", jid.resource) do |response| if doc = JSON.parse(response) rescue nil redis.multi redis.hdel("sessions:#{jid.}", jid.resource) redis.srem("cluster:nodes:#{doc['node']}", jid.to_s) redis.exec end end end |
#delete_all(node) ⇒ Object
Remove all user sessions from the routing table associated with the given node ID. Cluster nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/vines/cluster/sessions.rb', line 59 def delete_all(node) @nodes.delete(node) redis.smembers("cluster:nodes:#{node}") do |jids| redis.multi redis.del("cluster:nodes:#{node}") redis.hdel(NODES, node) jids.each do |jid| jid = JID.new(jid) redis.hdel("sessions:#{jid.}", jid.resource) end redis.exec end end |
#expire ⇒ Object
Cluster nodes broadcast a heartbeat to other members every second. If we haven’t heard from a node in five seconds, assume it’s offline and cleanup its session cache for it. Nodes may die abrubtly, without a chance to clear their sessions, so other members cleanup for them.
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/vines/cluster/sessions.rb', line 77 def expire redis.hset(NODES, @cluster.id, Time.now.to_i) redis.hgetall(NODES) do |response| now = Time.now expired = Hash[*response].select do |node, active| offset = @nodes[node] || 0 (now - offset) - Time.at(active.to_i) > 5 end.keys expired.each {|node| delete_all(node) } end end |
#find(*jids) ⇒ Object
Return the sessions for these JIDs. If a bare JID is used, all sessions for that user will be returned. If a full JID is used, the session for that single connected stream is returned.
21 22 23 24 25 26 |
# File 'lib/vines/cluster/sessions.rb', line 21 def find(*jids) jids.flatten.map do |jid| jid = JID.new(jid) jid. ? user_sessions(jid) : user_session(jid) end.compact.flatten end |
#poke(node, time) ⇒ Object
Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members’ clocks don’t necessarily need to be in sync.
92 93 94 95 |
# File 'lib/vines/cluster/sessions.rb', line 92 def poke(node, time) offset = Time.now.to_i - time @nodes[node] = offset end |
#save(jid, attrs) ⇒ Object
Persist the user’s session to the shared redis cache so that other cluster nodes can locate the node hosting this user’s connection and route messages to them.
31 32 33 34 35 36 37 38 |
# File 'lib/vines/cluster/sessions.rb', line 31 def save(jid, attrs) jid = JID.new(jid) session = {node: @cluster.id}.merge(attrs) redis.multi redis.hset("sessions:#{jid.}", jid.resource, session.to_json) redis.sadd("cluster:nodes:#{@cluster.id}", jid.to_s) redis.exec end |