Class: ZK::NodeDeletionWatcher

Inherits:
Object
  • Object
show all
Includes:
Exceptions, Logging, Zookeeper::Constants
Defined in:
lib/zk/node_deletion_watcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, set_default

Constructor Details

#initialize(zk, path) ⇒ NodeDeletionWatcher

Returns a new instance of NodeDeletionWatcher.



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/zk/node_deletion_watcher.rb', line 18

def initialize(zk, path)
  @zk     = zk
  @path   = path.dup

  @subs   = []

  @mutex  = Monitor.new # ffs, 1.8.7 compatibility w/ timeouts
  @cond   = @mutex.new_cond

  @blocked  = NOT_YET
  @result   = nil
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def path
  @path
end

#zkObject (readonly)

Returns the value of attribute zk.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def zk
  @zk
end

Instance Method Details

#block_until_deletedObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/zk/node_deletion_watcher.rb', line 89

def block_until_deleted
  @mutex.synchronize do
    raise InvalidStateError, "Already fired for #{path}" if @result
    register_callbacks

    unless zk.exists?(path, :watch => true)
      # we are done, these are one-shot, so write the results
      @result = :deleted
      @blocked = NOT_ANYMORE
      @cond.broadcast # wake any waiting threads
      return true
    end

    logger.debug { "ok, going to block: #{path}" }

    @blocked = BLOCKED
    @cond.broadcast                 # wake threads waiting for @blocked to change
    @cond.wait_until { @result }    # wait until we get a result
    @blocked = NOT_ANYMORE

    case @result
    when :deleted
      logger.debug { "path #{path} was deleted" }
      return true
    when INTERRUPTED
      raise ZK::Exceptions::WakeUpException
    when ZOO_EXPIRED_SESSION_STATE
      raise Zookeeper::Exceptions::SessionExpired
    when ZOO_CONNECTING_STATE
      raise Zookeeper::Exceptions::NotConnected
    when ZOO_CLOSED_STATE
      raise Zookeeper::Exceptions::ConnectionClosed
    else
      raise "Hit unexpected case in block_until_node_deleted, result was: #{@result.inspect}"
    end
  end
ensure
  unregister_callbacks
end

#blocked?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/zk/node_deletion_watcher.rb', line 35

def blocked?
  @mutex.synchronize { @blocked == BLOCKED }
end

#done?Boolean

Returns:

  • (Boolean)


31
32
33
# File 'lib/zk/node_deletion_watcher.rb', line 31

def done?
  @mutex.synchronize { !!@result }
end

#interrupt!Object

cause a thread blocked us to be awakened and have a WakeUpException raised.

if a result has already been delivered, then this does nothing

if a result has not yet been delivered, any thread calling block_until_deleted will receive the exception immediately



77
78
79
80
81
82
83
84
85
86
87
# File 'lib/zk/node_deletion_watcher.rb', line 77

def interrupt!
  @mutex.synchronize do
    case @blocked
    when NOT_YET, BLOCKED
      @result = INTERRUPTED
      @cond.broadcast
    else
      return
    end
  end
end

#wait_until_blocked(timeout = nil) ⇒ true?

this is for testing, allows us to wait until this object has gone into blocking state.

avoids the race where if we have already been blocked and released this will not block the caller

pass optional timeout to return after that amount of time or nil to block forever

Returns:

  • (true)

    if we have been blocked previously or are currently blocked,

  • (nil)

    if we timeout



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/zk/node_deletion_watcher.rb', line 51

def wait_until_blocked(timeout=nil)
  @mutex.synchronize do
    return true unless @blocked == NOT_YET

    start = Time.now
    time_to_stop = timeout ? (start + timeout) : nil

    logger.debug { "#{__method__} @blocked: #{@blocked.inspect} about to wait" } 
    @cond.wait(timeout)

    if (time_to_stop and (Time.now > time_to_stop)) and (@blocked == NOT_YET)
      return nil
    end

    (@blocked == NOT_YET) ? nil : true
  end
end