Class: Celluloid::Presence::ZkService
- Inherits:
-
Object
- Object
- Celluloid::Presence::ZkService
- Includes:
- Celluloid, Notifications
- Defined in:
- lib/celluloid-presence/service.rb
Class Method Summary collapse
-
.init(options) ⇒ Object
Optional init function to supervise this actor.
Instance Method Summary collapse
-
#connected? ⇒ Boolean
Our abstracted connected status to avoid race conditions.
-
#ensure(func, *args) ⇒ Object
Ensures important requests are followed through in the case of disconnect / reconnect Ignores request if the node does not exist USE SPARINGLY.
-
#event_callback(path, event) ⇒ Object
Publishes any zookeeper events that have been registered.
-
#initialize(options) ⇒ ZkService
constructor
Create a single connection to Zookeeper that persists through Actor crashes.
-
#on_connected ⇒ Object
Called once a connection is made to the zookeeper cluster Runs any impending ensures before informing any subscribed actors.
-
#on_connecting ⇒ Object
Called on disconnect from zookeeper Might be that the zookeeper node crashed or zookeeper is down, may not effect any other connections.
-
#register(path) ⇒ Object
Uses a closure to cleanly pull the zookeeper event back into the protection of celluloid.
-
#unregister(path) ⇒ Object
unsubscribes from zookeeper events once there are no more listeners.
Constructor Details
#initialize(options) ⇒ ZkService
Create a single connection to Zookeeper that persists through Actor crashes
servers: a list of Zookeeper servers to connect to. Each server in the
list has a host/port configuration
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/celluloid-presence/service.rb', line 28 def initialize() @@zk ||= begin Fanout.supervise_as :notifications_fanout if Actor[:notifications_fanout].nil? # Let them specify a single server instead of many servers = [:server].nil? ? [:servers] : [[:server]] raise "no Zookeeper servers given" unless servers # Add the default Zookeeper port unless specified servers.map! do |server| if server[/:\d+$/] server else "#{server}:#{DEFAULT_PORT}" end end ZK.new(*servers) do |zk| zk.on_connecting &proc { Actor[:zk_service].async.on_connecting } zk.on_connected &proc { Actor[:zk_service].async.on_connected } end end.tap do |zk| @@connected = zk.connected? end end |
Class Method Details
.init(options) ⇒ Object
Optional init function to supervise this actor
20 21 22 |
# File 'lib/celluloid-presence/service.rb', line 20 def self.init() ZkService.supervise_as :zk_service, end |
Instance Method Details
#connected? ⇒ Boolean
Our abstracted connected status to avoid race conditions
113 114 115 |
# File 'lib/celluloid-presence/service.rb', line 113 def connected? @@connected end |
#ensure(func, *args) ⇒ Object
Ensures important requests are followed through in the case of disconnect / reconnect Ignores request if the node does not exist USE SPARINGLY
122 123 124 125 126 127 128 |
# File 'lib/celluloid-presence/service.rb', line 122 def ensure(func, *args) if connected? ensured(func, *args) else @@ensure.push [func, args] end end |
#event_callback(path, event) ⇒ Object
Publishes any zookeeper events that have been registered
79 80 81 |
# File 'lib/celluloid-presence/service.rb', line 79 def event_callback(path, event) publish("zk_event_#{path}", event) end |
#on_connected ⇒ Object
Called once a connection is made to the zookeeper cluster Runs any impending ensures before informing any subscribed actors
58 59 60 61 62 63 64 65 |
# File 'lib/celluloid-presence/service.rb', line 58 def on_connected @@connected = true while not @@ensure.empty? func = @@ensure.shift ensured func[0], *func[1] # func[0] == function name, func[1] == arguments end publish('zk_connected') end |
#on_connecting ⇒ Object
Called on disconnect from zookeeper Might be that the zookeeper node crashed or zookeeper is down, may not effect any other connections
71 72 73 74 |
# File 'lib/celluloid-presence/service.rb', line 71 def on_connecting @@connected = false publish('zk_connecting') end |
#register(path) ⇒ Object
Uses a closure to cleanly pull the zookeeper event back into the protection of celluloid
86 87 88 89 90 91 92 93 |
# File 'lib/celluloid-presence/service.rb', line 86 def register(path) path = path.to_sym if @@registry[path].nil? callback = proc { |event| Actor[:zk_service].async.event_callback(path, event) } @@registry[path] = @@zk.register(path.to_s, &callback) end @@registry_count[path] = (@@registry_count[path] || 0) + 1 end |
#unregister(path) ⇒ Object
unsubscribes from zookeeper events once there are no more listeners
98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/celluloid-presence/service.rb', line 98 def unregister(path) path = path.to_sym if @@registry[path] @@registry_count[path] -= 1 if @@registry_count[path] <= 0 sub = @@registry.delete(path) sub.unsubscribe @@registry_count.delete(path) end end end |