Class: Celluloid::Presence::ZkService

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Notifications
Defined in:
lib/celluloid-presence/service.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ ZkService

Create a single connection to Zookeeper that persists through Actor crashes

servers: a list of Zookeeper servers to connect to. Each server in the

list has a host/port configuration


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/celluloid-presence/service.rb', line 28

def initialize(options)
	@@zk ||= begin
		Fanout.supervise_as :notifications_fanout if Actor[:notifications_fanout].nil?
		
		# Let them specify a single server instead of many
		servers = options[:server].nil? ? options[:servers] : [options[:server]]
		raise "no Zookeeper servers given" unless servers
		
		# Add the default Zookeeper port unless specified
		servers.map! do |server|
			if server[/:\d+$/]
				server
			else
				"#{server}:#{DEFAULT_PORT}"
			end
		end
		
		ZK.new(*servers) do |zk|
			zk.on_connecting &proc { Actor[:zk_service].async.on_connecting }
			zk.on_connected  &proc { Actor[:zk_service].async.on_connected }
		end
	end.tap do |zk|
		@@connected = zk.connected?
	end
end

Class Method Details

.init(options) ⇒ Object

Optional init function to supervise this actor



20
21
22
# File 'lib/celluloid-presence/service.rb', line 20

def self.init(options)
	ZkService.supervise_as :zk_service, options
end

Instance Method Details

#connected?Boolean

Our abstracted connected status to avoid race conditions

Returns:

  • (Boolean)


113
114
115
# File 'lib/celluloid-presence/service.rb', line 113

def connected?
	@@connected
end

#ensure(func, *args) ⇒ Object

Ensures important requests are followed through in the case of disconnect / reconnect Ignores request if the node does not exist USE SPARINGLY



122
123
124
125
126
127
128
# File 'lib/celluloid-presence/service.rb', line 122

def ensure(func, *args)
	if connected?
		ensured(func, *args)
	else
		@@ensure.push [func, args]
	end
end

#event_callback(path, event) ⇒ Object

Publishes any zookeeper events that have been registered



79
80
81
# File 'lib/celluloid-presence/service.rb', line 79

def event_callback(path, event)
	publish("zk_event_#{path}", event)
end

#on_connectedObject

Called once a connection is made to the zookeeper cluster Runs any impending ensures before informing any subscribed actors



58
59
60
61
62
63
64
65
# File 'lib/celluloid-presence/service.rb', line 58

def on_connected
	@@connected = true
	while not @@ensure.empty?
		func = @@ensure.shift
		ensured func[0], *func[1]	# func[0] == function name, func[1] == arguments
	end
	publish('zk_connected')
end

#on_connectingObject

Called on disconnect from zookeeper Might be that the zookeeper node crashed or zookeeper is down, may not effect any other connections



71
72
73
74
# File 'lib/celluloid-presence/service.rb', line 71

def on_connecting
	@@connected = false
	publish('zk_connecting')
end

#register(path) ⇒ Object

Uses a closure to cleanly pull the zookeeper event back into the protection of celluloid



86
87
88
89
90
91
92
93
# File 'lib/celluloid-presence/service.rb', line 86

def register(path)
	path = path.to_sym
	if @@registry[path].nil?
		callback = proc { |event| Actor[:zk_service].async.event_callback(path, event) }
		@@registry[path] = @@zk.register(path.to_s, &callback)
	end
	@@registry_count[path] = (@@registry_count[path] || 0) + 1
end

#unregister(path) ⇒ Object

unsubscribes from zookeeper events once there are no more listeners



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/celluloid-presence/service.rb', line 98

def unregister(path)
	path = path.to_sym
	if @@registry[path]
		@@registry_count[path] -= 1
		if @@registry_count[path] <= 0
			sub = @@registry.delete(path)
			sub.unsubscribe
			@@registry_count.delete(path)
		end
	end
end