Class: ZK::NodeDeletionWatcher

Inherits:
Object
  • Object
show all
Includes:
Exceptions, Logging, Zookeeper::Constants
Defined in:
lib/zk/node_deletion_watcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#logger, set_default

Constructor Details

#initialize(zk, path) ⇒ NodeDeletionWatcher

Returns a new instance of NodeDeletionWatcher.



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/zk/node_deletion_watcher.rb', line 19

def initialize(zk, path)
  @zk     = zk
  @path   = path.dup

  @subs   = []

  @mutex  = Monitor.new # ffs, 1.8.7 compatibility w/ timeouts
  @cond   = @mutex.new_cond

  @blocked  = NOT_YET
  @result   = nil
end

Instance Attribute Details

#pathObject (readonly)

Returns the value of attribute path.



17
18
19
# File 'lib/zk/node_deletion_watcher.rb', line 17

def path
  @path
end

#zkObject (readonly)

Returns the value of attribute zk.



17
18
19
# File 'lib/zk/node_deletion_watcher.rb', line 17

def zk
  @zk
end

Instance Method Details

#block_until_deleted(opts = {}) ⇒ Object

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :timeout (Numeric) — default: nil

    if a positive integer, represents a duration in seconds after which, if we have not acquired the lock, a LockWaitTimeoutError will be raised in all waiting threads



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/zk/node_deletion_watcher.rb', line 98

def block_until_deleted(opts={})
  timeout = opts[:timeout]

  @mutex.synchronize do
    raise InvalidStateError, "Already fired for #{path}" if @result
    register_callbacks

    unless zk.exists?(path, :watch => true)
      # we are done, these are one-shot, so write the results
      @result = :deleted
      @blocked = NOT_ANYMORE
      @cond.broadcast # wake any waiting threads
      return true
    end

    logger.debug { "ok, going to block: #{path}" }

    @blocked = BLOCKED
    @cond.broadcast                 # wake threads waiting for @blocked to change

    wait_for_result(timeout)

    @blocked = NOT_ANYMORE

    logger.debug { "got result for path: #{path}, result: #{@result.inspect}" } 

    case @result
    when :deleted
      logger.debug { "path #{path} was deleted" }
      return true
    when TIMED_OUT
      raise ZK::Exceptions::LockWaitTimeoutError, "timed out waiting for #{timeout.inspect} seconds for deletion of path: #{path.inspect}" 
    when INTERRUPTED
      raise ZK::Exceptions::WakeUpException
    when ZOO_EXPIRED_SESSION_STATE
      raise Zookeeper::Exceptions::SessionExpired
    when ZOO_CONNECTING_STATE
      raise Zookeeper::Exceptions::NotConnected
    when ZOO_CLOSED_STATE
      raise Zookeeper::Exceptions::ConnectionClosed
    else
      raise "Hit unexpected case in block_until_node_deleted, result was: #{@result.inspect}"
    end
  end
ensure
  unregister_callbacks
end

#blocked?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/zk/node_deletion_watcher.rb', line 36

def blocked?
  @mutex.synchronize { @blocked == BLOCKED }
end

#done?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/zk/node_deletion_watcher.rb', line 32

def done?
  @mutex.synchronize { !!@result }
end

#interrupt!Object

cause a thread blocked by us to be awakened and have a WakeUpException raised.

if a result has already been delivered, then this does nothing

if a result has not yet been delivered, any thread calling block_until_deleted will receive the exception immediately



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/zk/node_deletion_watcher.rb', line 82

def interrupt!
  @mutex.synchronize do
    case @blocked
    when NOT_YET, BLOCKED
      @result = INTERRUPTED
      @cond.broadcast
    else
      return
    end
  end
end

#timed_out?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/zk/node_deletion_watcher.rb', line 40

def timed_out?
  @mutex.synchronize { @result == TIMED_OUT }
end

#wait_until_blocked(timeout = nil) ⇒ true?

this is for testing, allows us to wait until this object has gone into blocking state.

avoids the race where if we have already been blocked and released this will not block the caller

pass optional timeout to return after that amount of time or nil to block forever

Returns:

  • (true)

    if we have been blocked previously or are currently blocked,

  • (nil)

    if we timeout



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/zk/node_deletion_watcher.rb', line 56

def wait_until_blocked(timeout=nil)
  @mutex.synchronize do
    return true unless @blocked == NOT_YET

    start = Time.now
    time_to_stop = timeout ? (start + timeout) : nil

    logger.debug { "#{__method__} @blocked: #{@blocked.inspect} about to wait" } 
    @cond.wait(timeout)

    if (time_to_stop and (Time.now > time_to_stop)) and (@blocked == NOT_YET)
      return nil
    end

    (@blocked == NOT_YET) ? nil : true
  end
end