Class: ZK::NodeDeletionWatcher
- Inherits:
-
Object
- Object
- ZK::NodeDeletionWatcher
- Includes:
- Exceptions, Logging, Zookeeper::Constants
- Defined in:
- lib/zk/node_deletion_watcher.rb
Instance Attribute Summary collapse
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
- #block_until_deleted(opts = {}) ⇒ Object
- #blocked? ⇒ Boolean
- #done? ⇒ Boolean
-
#initialize(zk, path) ⇒ NodeDeletionWatcher
constructor
A new instance of NodeDeletionWatcher.
-
#interrupt! ⇒ Object
cause a thread blocked by us to be awakened and have a WakeUpException raised.
- #timed_out? ⇒ Boolean
-
#wait_until_blocked(timeout = nil) ⇒ true?
this is for testing, allows us to wait until this object has gone into blocking state.
Methods included from Logging
Constructor Details
#initialize(zk, path) ⇒ NodeDeletionWatcher
Returns a new instance of NodeDeletionWatcher.
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/zk/node_deletion_watcher.rb', line 19 def initialize(zk, path) @zk = zk @path = path.dup @subs = [] @mutex = Monitor.new # ffs, 1.8.7 compatibility w/ timeouts @cond = @mutex.new_cond @blocked = NOT_YET @result = nil end |
Instance Attribute Details
#path ⇒ Object (readonly)
Returns the value of attribute path.
17 18 19 |
# File 'lib/zk/node_deletion_watcher.rb', line 17 def path @path end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
17 18 19 |
# File 'lib/zk/node_deletion_watcher.rb', line 17 def zk @zk end |
Instance Method Details
#block_until_deleted(opts = {}) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/zk/node_deletion_watcher.rb', line 98 def block_until_deleted(opts={}) timeout = opts[:timeout] @mutex.synchronize do raise InvalidStateError, "Already fired for #{path}" if @result register_callbacks unless zk.exists?(path, :watch => true) # we are done, these are one-shot, so write the results @result = :deleted @blocked = NOT_ANYMORE @cond.broadcast # wake any waiting threads return true end logger.debug { "ok, going to block: #{path}" } @blocked = BLOCKED @cond.broadcast # wake threads waiting for @blocked to change wait_for_result(timeout) @blocked = NOT_ANYMORE logger.debug { "got result for path: #{path}, result: #{@result.inspect}" } case @result when :deleted logger.debug { "path #{path} was deleted" } return true when TIMED_OUT raise ZK::Exceptions::LockWaitTimeoutError, "timed out waiting for #{timeout.inspect} seconds for deletion of path: #{path.inspect}" when INTERRUPTED raise ZK::Exceptions::WakeUpException when ZOO_EXPIRED_SESSION_STATE raise Zookeeper::Exceptions::SessionExpired when ZOO_CONNECTING_STATE raise Zookeeper::Exceptions::NotConnected when ZOO_CLOSED_STATE raise Zookeeper::Exceptions::ConnectionClosed else raise "Hit unexpected case in block_until_node_deleted, result was: #{@result.inspect}" end end ensure unregister_callbacks end |
#blocked? ⇒ Boolean
36 37 38 |
# File 'lib/zk/node_deletion_watcher.rb', line 36 def blocked? @mutex.synchronize { @blocked == BLOCKED } end |
#done? ⇒ Boolean
32 33 34 |
# File 'lib/zk/node_deletion_watcher.rb', line 32 def done? @mutex.synchronize { !!@result } end |
#interrupt! ⇒ Object
cause a thread blocked by us to be awakened and have a WakeUpException raised.
if a result has already been delivered, then this does nothing
if a result has not yet been delivered, any thread calling block_until_deleted will receive the exception immediately
82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/zk/node_deletion_watcher.rb', line 82 def interrupt! @mutex.synchronize do case @blocked when NOT_YET, BLOCKED @result = INTERRUPTED @cond.broadcast else return end end end |
#timed_out? ⇒ Boolean
40 41 42 |
# File 'lib/zk/node_deletion_watcher.rb', line 40 def timed_out? @mutex.synchronize { @result == TIMED_OUT } end |
#wait_until_blocked(timeout = nil) ⇒ true?
this is for testing, allows us to wait until this object has gone into blocking state.
avoids the race where if we have already been blocked and released this will not block the caller
pass optional timeout to return after that amount of time or nil to block forever
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/zk/node_deletion_watcher.rb', line 56 def wait_until_blocked(timeout=nil) @mutex.synchronize do return true unless @blocked == NOT_YET start = Time.now time_to_stop = timeout ? (start + timeout) : nil logger.debug { "#{__method__} @blocked: #{@blocked.inspect} about to wait" } @cond.wait(timeout) if (time_to_stop and (Time.now > time_to_stop)) and (@blocked == NOT_YET) return nil end (@blocked == NOT_YET) ? nil : true end end |