Class: Celluloid::Presence::ZkService

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Notifications
Defined in:
lib/celluloid-presence/service.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ ZkService

Create a single connection to Zookeeper that persists through Actor crashes

servers: a list of Zookeeper servers to connect to. Each server in the

list has a host/port configuration


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/celluloid-presence/service.rb', line 28

def initialize(options)
  @@zk ||= begin
    Fanout.supervise_as :notifications_fanout if Actor[:notifications_fanout].nil?
    
    # Let them specify a single server instead of many
    servers = options[:server].nil? ? options[:servers] : [options[:server]]
    raise "no Zookeeper servers given" unless servers
    
    # Add the default Zookeeper port unless specified
    servers.map! do |server|
      if server[/:\d+$/]
        server
      else
        "#{server}:#{DEFAULT_PORT}"
      end
    end
    
    ZK.new(*servers) do |zk|
      zk.on_connecting &proc { Actor[:zk_service].async.on_connecting }
      zk.on_connected  &proc { Actor[:zk_service].async.on_connected }
    end
  end.tap do |zk|
    @@connected = zk.connected?
  end
end

Class Method Details

.init(options) ⇒ Object

Optional init function to supervise this actor



20
21
22
# File 'lib/celluloid-presence/service.rb', line 20

def self.init(options)
  ZkService.supervise_as :zk_service, options
end

Instance Method Details

#connected?Boolean

Our abstracted connected status to avoid race conditions

Returns:

  • (Boolean)


113
114
115
# File 'lib/celluloid-presence/service.rb', line 113

def connected?
  @@connected
end

#ensure(func, *args) ⇒ Object

Ensures important requests are followed through in the case of disconnect / reconnect Ignores request if the node does not exist USE SPARINGLY



122
123
124
125
126
127
128
# File 'lib/celluloid-presence/service.rb', line 122

def ensure(func, *args)
  if connected?
    ensured(func, *args)
  else
    @@ensure.push [func, args]
  end
end

#event_callback(path, event) ⇒ Object

Publishes any zookeeper events that have been registered



79
80
81
# File 'lib/celluloid-presence/service.rb', line 79

def event_callback(path, event)
  publish("zk_event_#{path}", event)
end

#on_connectedObject

Called once a connection is made to the zookeeper cluster Runs any impending ensures before informing any subscribed actors



58
59
60
61
62
63
64
65
# File 'lib/celluloid-presence/service.rb', line 58

def on_connected
  @@connected = true
  while not @@ensure.empty?
    func = @@ensure.shift
    ensured func[0], *func[1] # func[0] == function name, func[1] == arguments
  end
  publish('zk_connected')
end

#on_connectingObject

Called on disconnect from zookeeper Might be that the zookeeper node crashed or zookeeper is down, may not effect any other connections



71
72
73
74
# File 'lib/celluloid-presence/service.rb', line 71

def on_connecting
  @@connected = false
  publish('zk_connecting')
end

#register(path) ⇒ Object

Uses a closure to cleanly pull the zookeeper event back into the protection of celluloid



86
87
88
89
90
91
92
93
# File 'lib/celluloid-presence/service.rb', line 86

def register(path)
  path = path.to_sym
  if @@registry[path].nil?
    callback = proc { |event| Actor[:zk_service].async.event_callback(path, event) }
    @@registry[path] = @@zk.register(path.to_s, &callback)
  end
  @@registry_count[path] = (@@registry_count[path] || 0) + 1
end

#unregister(path) ⇒ Object

unsubscribes from zookeeper events once there are no more listeners



98
99
100
101
102
103
104
105
106
107
108
# File 'lib/celluloid-presence/service.rb', line 98

def unregister(path)
  path = path.to_sym
  if @@registry[path]
    @@registry_count[path] -= 1
    if @@registry_count[path] <= 0
      sub = @@registry.delete(path)
      sub.unsubscribe
      @@registry_count.delete(path)
    end
  end
end