Class: ZK::NodeDeletionWatcher

Inherits:
Object
  • Object
show all
Includes:
Logger, Zookeeper::Constants
Defined in:
lib/zk/node_deletion_watcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

#logger, wrapped_logger, wrapped_logger=

Constructor Details

#initialize(zk, paths, options = {}) ⇒ NodeDeletionWatcher

Create a new NodeDeletionWatcher that has the ability to block until some or all of the paths given to it have been deleted.

Parameters:

  • zk (ZK::client)
  • paths (Array)
    • one or more paths to watch
  • optional (Hash)

    options - Symbol-keyed hash

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :threshold (Integer, false, nil) — default: 0

    the number of remaining nodes allowed when determining whether or not to continue blocking. If false or nil are provided, the default will be substituted.

Raises:



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/zk/node_deletion_watcher.rb', line 37

def initialize( zk, paths, options={} )
  paths = [paths] if paths.kind_of? String # old style single-node support

  @zk         = zk
  @paths      = paths.dup
  @options    = options.dup
  @threshold  = options[:threshold] || 0
  raise ZK::Exceptions::BadArguments, <<-EOBADARG unless @threshold.kind_of? Integer
    options[:threshold] must be an Integer. Got #{@threshold.inspect}."
  EOBADARG

  @watched_paths = []
  @remaining_paths = paths.dup

  @subs   = []

  @mutex  = Monitor.new # ffs, 1.8.7 compatibility w/ timeouts
  @cond   = @mutex.new_cond

  @blocked  = NOT_YET
  @result   = nil
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def options
  @options
end

#pathsObject (readonly)

Returns the value of attribute paths.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def paths
  @paths
end

#remaining_pathsObject (readonly)

Returns the value of attribute remaining_paths.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def remaining_paths
  @remaining_paths
end

#thresholdObject (readonly)

Returns the value of attribute threshold.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def threshold
  @threshold
end

#watched_pathsObject (readonly)

Returns the value of attribute watched_paths.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def watched_paths
  @watched_paths
end

#zkObject (readonly)

Returns the value of attribute zk.



16
17
18
# File 'lib/zk/node_deletion_watcher.rb', line 16

def zk
  @zk
end

Instance Method Details

#block_until_deleted(opts = {}) ⇒ Object

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :timeout (Numeric) — default: nil

    if a positive integer, represents a duration in seconds after which, if the threshold has not been met, a LockWaitTimeoutError will be raised in all waiting threads.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/zk/node_deletion_watcher.rb', line 126

def block_until_deleted(opts={})
  timeout = opts[:timeout]

  @mutex.synchronize do
    raise InvalidStateError, "Already fired for #{status_string}" if @result
    register_callbacks

    watch_appropriate_nodes

    return finish_blocking if threshold_met?

    logger.debug { "ok, going to block: #{status_string}" }

    @blocked = BLOCKED
    @cond.broadcast                 # wake threads waiting for @blocked to change

    wait_for_result(timeout)

    @blocked = NOT_ANYMORE

    logger.debug { "got result: #{@result.inspect}. #{status_string}" }

    case @result
    when :deleted
      logger.debug { "enough paths were deleted. #{status_string}" }
      return true
    when TIMED_OUT
      raise ZK::Exceptions::LockWaitTimeoutError,
        "timed out waiting for #{timeout.inspect} seconds for deletion of paths. #{status_string}"
    when INTERRUPTED
      raise ZK::Exceptions::WakeUpException
    when ZOO_EXPIRED_SESSION_STATE
      raise Zookeeper::Exceptions::SessionExpired
    when ZOO_CONNECTING_STATE
      raise Zookeeper::Exceptions::NotConnected
    when ZOO_CLOSED_STATE
      raise Zookeeper::Exceptions::ConnectionClosed
    else
      raise "Hit unexpected case in block_until_node_deleted, result was: #{@result.inspect}. #{status_string}"
    end
  end
ensure
  unregister_callbacks
end

#blocked?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/zk/node_deletion_watcher.rb', line 64

def blocked?
  @mutex.synchronize { @blocked == BLOCKED }
end

#done?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/zk/node_deletion_watcher.rb', line 60

def done?
  @mutex.synchronize { !!@result }
end

#interrupt!Object

cause a thread blocked by us to be awakened and have a WakeUpException raised.

if a result has already been delivered, then this does nothing

if a result has not yet been delivered, any thread calling block_until_deleted will receive the exception immediately



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/zk/node_deletion_watcher.rb', line 110

def interrupt!
  @mutex.synchronize do
    case @blocked
    when NOT_YET, BLOCKED
      @result = INTERRUPTED
      @cond.broadcast
    else
      return
    end
  end
end

#timed_out?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/zk/node_deletion_watcher.rb', line 68

def timed_out?
  @mutex.synchronize { @result == TIMED_OUT }
end

#wait_until_blocked(timeout = nil) ⇒ true?

this is for testing, allows us to wait until this object has gone into blocking state.

avoids the race where if we have already been blocked and released this will not block the caller

pass optional timeout to return after that amount of time or nil to block forever

Returns:

  • (true)

    if we have been blocked previously or are currently blocked,

  • (nil)

    if we timeout



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/zk/node_deletion_watcher.rb', line 84

def wait_until_blocked(timeout=nil)
  @mutex.synchronize do
    return true unless @blocked == NOT_YET

    start = Time.now
    time_to_stop = timeout ? (start + timeout) : nil

    logger.debug { "#{__method__} @blocked: #{@blocked.inspect} about to wait" }
    @cond.wait(timeout)

    if (time_to_stop and (Time.now > time_to_stop)) and (@blocked == NOT_YET)
      return nil
    end

    (@blocked == NOT_YET) ? nil : true
  end
end