Class: Vines::Cluster::Sessions

Inherits:
Object
  • Object
show all
Includes:
Log
Defined in:
lib/vines/cluster/sessions.rb

Overview

Manages the cluster node list and user session routing table stored in redis. All cluster nodes share this in-memory database to quickly discover the node hosting a particular user session. Once a session is located, stanzas can be routed to that node via the Publisher.

Constant Summary collapse

NODES =
'cluster:nodes'.freeze

Instance Method Summary collapse

Methods included from Log

#log

Constructor Details

#initialize(cluster) ⇒ Sessions

Returns a new instance of Sessions.



14
15
16
# File 'lib/vines/cluster/sessions.rb', line 14

def initialize(cluster)
  @cluster, @nodes = cluster, {}
end

Instance Method Details

#delete(jid) ⇒ Object

Remove this user from the cluster routing table so that no further stanzas may be routed to them. This must be called when the user’s session is terminated, either by logout or stream disconnect.



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/vines/cluster/sessions.rb', line 43

def delete(jid)
  jid = JID.new(jid)
  redis.hget("sessions:#{jid.bare}", jid.resource) do |response|
    if doc = JSON.parse(response) rescue nil
      redis.multi
      redis.hdel("sessions:#{jid.bare}", jid.resource)
      redis.srem("cluster:nodes:#{doc['node']}", jid.to_s)
      redis.exec
    end
  end
end

#delete_all(node) ⇒ Object

Remove all user sessions from the routing table associated with the given node ID. Cluster nodes call this themselves during normal shutdown. However, if a node dies without being properly shutdown, the other nodes will cleanup its sessions when they detect the node is offline.



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/vines/cluster/sessions.rb', line 59

def delete_all(node)
  @nodes.delete(node)
  redis.smembers("cluster:nodes:#{node}") do |jids|
    redis.multi
    redis.del("cluster:nodes:#{node}")
    redis.hdel(NODES, node)
    jids.each do |jid|
      jid = JID.new(jid)
      redis.hdel("sessions:#{jid.bare}", jid.resource)
    end
    redis.exec
  end
end

#expireObject

Cluster nodes broadcast a heartbeat to other members every second. If we haven’t heard from a node in five seconds, assume it’s offline and cleanup its session cache for it. Nodes may die abrubtly, without a chance to clear their sessions, so other members cleanup for them.



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/vines/cluster/sessions.rb', line 77

def expire
  redis.hset(NODES, @cluster.id, Time.now.to_i)
  redis.hgetall(NODES) do |response|
    now = Time.now
    expired = Hash[*response].select do |node, active|
      offset = @nodes[node] || 0
      (now - offset) - Time.at(active.to_i) > 5
    end.keys
    expired.each {|node| delete_all(node) }
  end
end

#find(*jids) ⇒ Object

Return the sessions for these JIDs. If a bare JID is used, all sessions for that user will be returned. If a full JID is used, the session for that single connected stream is returned.



21
22
23
24
25
26
# File 'lib/vines/cluster/sessions.rb', line 21

def find(*jids)
  jids.flatten.map do |jid|
    jid = JID.new(jid)
    jid.bare? ? user_sessions(jid) : user_session(jid)
  end.compact.flatten
end

#poke(node, time) ⇒ Object

Notify the session store that this node is still alive. The node broadcasts its current time, so all cluster members’ clocks don’t necessarily need to be in sync.



92
93
94
95
# File 'lib/vines/cluster/sessions.rb', line 92

def poke(node, time)
  offset = Time.now.to_i - time
  @nodes[node] = offset
end

#save(jid, attrs) ⇒ Object

Persist the user’s session to the shared redis cache so that other cluster nodes can locate the node hosting this user’s connection and route messages to them.



31
32
33
34
35
36
37
38
# File 'lib/vines/cluster/sessions.rb', line 31

def save(jid, attrs)
  jid = JID.new(jid)
  session = {node: @cluster.id}.merge(attrs)
  redis.multi
  redis.hset("sessions:#{jid.bare}", jid.resource, session.to_json)
  redis.sadd("cluster:nodes:#{@cluster.id}", jid.to_s)
  redis.exec
end