Class: ZK::EventHandler
- Inherits:
-
Object
- Object
- ZK::EventHandler
- Includes:
- Logging
- Defined in:
- lib/z_k/event_handler.rb
Overview
this is the default watcher provided by the zookeeper connection watchers are implemented by adding the :watch => true flag to any #children or #get or #exists calls you never really need to initialize this yourself
Constant Summary collapse
- VALID_WATCH_TYPES =
[:data, :child].freeze
- ZOOKEEPER_WATCH_TYPE_MAP =
{ Zookeeper::ZOO_CREATED_EVENT => :data, Zookeeper::ZOO_DELETED_EVENT => :data, Zookeeper::ZOO_CHANGED_EVENT => :data, Zookeeper::ZOO_CHILD_EVENT => :child, }.freeze
Instance Attribute Summary collapse
-
#zk ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#clear! ⇒ Object
used during shutdown to clear registered listeners.
- #get_default_watcher_block ⇒ Object
-
#process(event) ⇒ Object
called from the client-registered callback when an event fires.
-
#register(path, &block) {|connection, event| ... } ⇒ ZooKeeper::EventHandlerSubscription
(also: #subscribe)
register a path with the handler your block will be called with all events on that path.
-
#register_state_handler(state, &block) {|connection, event| ... } ⇒ Object
registers a “state of the connection” handler.
- #safe_call(callbacks, *args) ⇒ Object protected
-
#setup_watcher!(watch_type, opts) ⇒ Object
implements not only setting up the watcher callback, but deduplicating event delivery.
- #state_key(arg) ⇒ Object protected
-
#synchronize ⇒ Object
:nodoc:.
-
#unregister(*args) ⇒ Object
(also: #unsubscribe)
deprecated
Deprecated.
use #unsubscribe on the subscription object
-
#unregister_state_handler(*args) ⇒ Object
deprecated
Deprecated.
use #unsubscribe on the subscription object
- #watcher_callback ⇒ Object protected
Instance Attribute Details
#zk ⇒ Object
:nodoc:
19 20 21 |
# File 'lib/z_k/event_handler.rb', line 19 def zk @zk end |
Instance Method Details
#clear! ⇒ Object
used during shutdown to clear registered listeners
130 131 132 133 134 135 |
# File 'lib/z_k/event_handler.rb', line 130 def clear! #:nodoc: synchronize do @callbacks.clear nil end end |
#get_default_watcher_block ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/z_k/event_handler.rb', line 141 def get_default_watcher_block @default_watcher_block ||= lambda do |hash| watcher_callback.tap do |cb| cb.call(hash) end end end |
#process(event) ⇒ Object
called from the client-registered callback when an event fires
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/z_k/event_handler.rb', line 96 def process(event) #:nodoc: # logger.debug { "EventHandler#process dispatching event: #{event.inspect}" }# unless event.type == -1 event.zk = @zk cb_key = if event.node_event? event.path elsif event.state_event? state_key(event.state) else raise ZKError, "don't know how to process event: #{event.inspect}" end # logger.debug { "EventHandler#process: cb_key: #{cb_key}" } cb_ary = synchronize do if event.node_event? if watch_type = ZOOKEEPER_WATCH_TYPE_MAP[event.type] # logger.debug { "re-allowing #{watch_type.inspect} watches on path #{event.path.inspect}" } # we recieved a watch event for this path, now we allow code to set new watchers @outstanding_watches[watch_type].delete(event.path) end end @callbacks[cb_key].dup end cb_ary.compact! safe_call(cb_ary, event) end |
#register(path, &block) {|connection, event| ... } ⇒ ZooKeeper::EventHandlerSubscription Also known as: subscribe
register a path with the handler your block will be called with all events on that path. aliased as #subscribe
45 46 47 48 49 50 |
# File 'lib/z_k/event_handler.rb', line 45 def register(path, &block) # logger.debug { "EventHandler#register path=#{path.inspect}" } EventHandlerSubscription.new(self, path, block).tap do |subscription| synchronize { @callbacks[path] << subscription } end end |
#register_state_handler(state, &block) {|connection, event| ... } ⇒ Object
registers a “state of the connection” handler
58 59 60 |
# File 'lib/z_k/event_handler.rb', line 58 def register_state_handler(state, &block) register(state_key(state), &block) end |
#safe_call(callbacks, *args) ⇒ Object (protected)
192 193 194 195 196 197 198 199 200 201 |
# File 'lib/z_k/event_handler.rb', line 192 def safe_call(callbacks, *args) while cb = callbacks.shift begin cb.call(*args) if cb.respond_to?(:call) rescue Exception => e logger.error { "Error caught in user supplied callback" } logger.error { e.to_std_format } end end end |
#setup_watcher!(watch_type, opts) ⇒ Object
implements not only setting up the watcher callback, but deduplicating event delivery. Keeps track of in-flight watcher-type+path requests and doesn’t re-register the watcher with the server until a response has been fired. This prevents one event delivery to every callback per :watch => true argument.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/z_k/event_handler.rb', line 154 def setup_watcher!(watch_type, opts) return unless opts.delete(:watch) synchronize do set = @outstanding_watches.fetch(watch_type) path = opts[:path] if set.add?(path) # this path has no outstanding watchers, let it do its thing opts[:watcher] = watcher_callback else # outstanding watch for path and data pair already exists, so ignore # logger.debug { "outstanding watch request for path #{path.inspect} and watcher type #{watch_type.inspect}, not re-registering" } end end end |
#state_key(arg) ⇒ Object (protected)
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/z_k/event_handler.rb', line 176 def state_key(arg) int = case arg when String, Symbol ZookeeperConstants.const_get(:"ZOO_#{arg.to_s.upcase}_STATE") when Integer arg else raise NameError # ugh lame end "state_#{int}" rescue NameError raise ArgumentError, "#{arg} is not a valid zookeeper state", caller end |
#synchronize ⇒ Object
:nodoc:
137 138 139 |
# File 'lib/z_k/event_handler.rb', line 137 def synchronize #:nodoc: @mutex.synchronize { yield } end |
#unregister(*args) ⇒ Object Also known as: unsubscribe
use #unsubscribe on the subscription object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/z_k/event_handler.rb', line 74 def unregister(*args) if args.first.is_a?(EventHandlerSubscription) subscription = args.first elsif args.first.is_a?(String) and args[1].is_a?(EventHandlerSubscription) subscription = args[1] else path, index = args[0..1] synchronize { @callbacks[path][index] = nil } return end synchronize do ary = @callbacks[subscription.path] idx = ary.index(subscription) and ary.delete_at(idx) end nil end |
#unregister_state_handler(*args) ⇒ Object
use #unsubscribe on the subscription object
64 65 66 67 68 69 70 |
# File 'lib/z_k/event_handler.rb', line 64 def unregister_state_handler(*args) if args.first.is_a?(EventHandlerSubscription) unregister(args.first) else unregister(state_key(args.first), args[1]) end end |
#watcher_callback ⇒ Object (protected)
172 173 174 |
# File 'lib/z_k/event_handler.rb', line 172 def watcher_callback ZookeeperCallbacks::WatcherCallback.create { |event| process(event) } end |