Class: ZK::NodeDeletionWatcher
- Inherits:
-
Object
- Object
- ZK::NodeDeletionWatcher
- Includes:
- Logger, Zookeeper::Constants
- Defined in:
- lib/zk/node_deletion_watcher.rb
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#paths ⇒ Object
readonly
Returns the value of attribute paths.
-
#remaining_paths ⇒ Object
readonly
Returns the value of attribute remaining_paths.
-
#threshold ⇒ Object
readonly
Returns the value of attribute threshold.
-
#watched_paths ⇒ Object
readonly
Returns the value of attribute watched_paths.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
- #block_until_deleted(opts = {}) ⇒ Object
- #blocked? ⇒ Boolean
- #done? ⇒ Boolean
-
#initialize(zk, paths, options = {}) ⇒ NodeDeletionWatcher
constructor
Create a new NodeDeletionWatcher that has the ability to block until some or all of the paths given to it have been deleted.
-
#interrupt! ⇒ Object
cause a thread blocked by us to be awakened and have a WakeUpException raised.
- #timed_out? ⇒ Boolean
-
#wait_until_blocked(timeout = nil) ⇒ true?
this is for testing, allows us to wait until this object has gone into blocking state.
Methods included from Logger
#logger, wrapped_logger, wrapped_logger=
Constructor Details
#initialize(zk, paths, options = {}) ⇒ NodeDeletionWatcher
Create a new NodeDeletionWatcher that has the ability to block until some or all of the paths given to it have been deleted.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/zk/node_deletion_watcher.rb', line 37 def initialize( zk, paths, ={} ) paths = [paths] if paths.kind_of? String # old style single-node support @zk = zk @paths = paths.dup @options = .dup @threshold = [:threshold] || 0 raise ZK::Exceptions::BadArguments, <<-EOBADARG unless @threshold.kind_of? Integer options[:threshold] must be an Integer. Got #{@threshold.inspect}." EOBADARG @watched_paths = [] @remaining_paths = paths.dup @subs = [] @mutex = Monitor.new # ffs, 1.8.7 compatibility w/ timeouts @cond = @mutex.new_cond @blocked = NOT_YET @result = nil end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def @options end |
#paths ⇒ Object (readonly)
Returns the value of attribute paths.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def paths @paths end |
#remaining_paths ⇒ Object (readonly)
Returns the value of attribute remaining_paths.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def remaining_paths @remaining_paths end |
#threshold ⇒ Object (readonly)
Returns the value of attribute threshold.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def threshold @threshold end |
#watched_paths ⇒ Object (readonly)
Returns the value of attribute watched_paths.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def watched_paths @watched_paths end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def zk @zk end |
Instance Method Details
#block_until_deleted(opts = {}) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/zk/node_deletion_watcher.rb', line 126 def block_until_deleted(opts={}) timeout = opts[:timeout] @mutex.synchronize do raise InvalidStateError, "Already fired for #{status_string}" if @result register_callbacks watch_appropriate_nodes return finish_blocking if threshold_met? logger.debug { "ok, going to block: #{status_string}" } @blocked = BLOCKED @cond.broadcast # wake threads waiting for @blocked to change wait_for_result(timeout) @blocked = NOT_ANYMORE logger.debug { "got result: #{@result.inspect}. #{status_string}" } case @result when :deleted logger.debug { "enough paths were deleted. #{status_string}" } return true when TIMED_OUT raise ZK::Exceptions::LockWaitTimeoutError, "timed out waiting for #{timeout.inspect} seconds for deletion of paths. #{status_string}" when INTERRUPTED raise ZK::Exceptions::WakeUpException when ZOO_EXPIRED_SESSION_STATE raise Zookeeper::Exceptions::SessionExpired when ZOO_CONNECTING_STATE raise Zookeeper::Exceptions::NotConnected when ZOO_CLOSED_STATE raise Zookeeper::Exceptions::ConnectionClosed else raise "Hit unexpected case in block_until_node_deleted, result was: #{@result.inspect}. #{status_string}" end end ensure unregister_callbacks end |
#blocked? ⇒ Boolean
64 65 66 |
# File 'lib/zk/node_deletion_watcher.rb', line 64 def blocked? @mutex.synchronize { @blocked == BLOCKED } end |
#done? ⇒ Boolean
60 61 62 |
# File 'lib/zk/node_deletion_watcher.rb', line 60 def done? @mutex.synchronize { !!@result } end |
#interrupt! ⇒ Object
cause a thread blocked by us to be awakened and have a WakeUpException raised.
if a result has already been delivered, then this does nothing
if a result has not yet been delivered, any thread calling block_until_deleted will receive the exception immediately
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/zk/node_deletion_watcher.rb', line 110 def interrupt! @mutex.synchronize do case @blocked when NOT_YET, BLOCKED @result = INTERRUPTED @cond.broadcast else return end end end |
#timed_out? ⇒ Boolean
68 69 70 |
# File 'lib/zk/node_deletion_watcher.rb', line 68 def timed_out? @mutex.synchronize { @result == TIMED_OUT } end |
#wait_until_blocked(timeout = nil) ⇒ true?
this is for testing, allows us to wait until this object has gone into blocking state.
avoids the race where if we have already been blocked and released this will not block the caller
pass optional timeout to return after that amount of time or nil to block forever
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/zk/node_deletion_watcher.rb', line 84 def wait_until_blocked(timeout=nil) @mutex.synchronize do return true unless @blocked == NOT_YET start = Time.now time_to_stop = timeout ? (start + timeout) : nil logger.debug { "#{__method__} @blocked: #{@blocked.inspect} about to wait" } @cond.wait(timeout) if (time_to_stop and (Time.now > time_to_stop)) and (@blocked == NOT_YET) return nil end (@blocked == NOT_YET) ? nil : true end end |