Class: Distlock::ZK::ExclusiveLock

Inherits:
Object
  • Object
show all
Includes:
Common
Defined in:
lib/distlock/zk/exclusive_lock.rb

Constant Summary collapse

WATCHER_TIMEOUT =

seconds

60

Instance Method Summary collapse

Methods included from Common

#close, #create_sequenced_ephemeral, #exists?, #logger, #logger=, #safe_create, #zk

Constructor Details

#initialize(options = {}) ⇒ ExclusiveLock

Returns a new instance of ExclusiveLock.



8
9
10
11
# File 'lib/distlock/zk/exclusive_lock.rb', line 8

def initialize(options={})
  defaults = {:host => "localhost:2181", :timeout => 10, :root_path => "/lock/exclusive/default"}
  @options = defaults.merge(options)
end

Instance Method Details

#_get_lock(lock) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/distlock/zk/exclusive_lock.rb', line 82

def _get_lock(lock)  
  logger.debug "_get_lock: entered for #{lock}"

  while !@owner

    path = lock.split('/')[0...-1].join('/')
  
    # TODO - pass children in as parameter?
    children = zk.get_children(:path => path)[:children].sort{|a,b|a.split('-').last <=> b.split('-').last}

    lock_last = lock.split('/').last
    lock_idx = children.index(lock_last)

    if lock_idx.nil?
      logger.error("Distlock::ZK::ExclusiveLock#_get_lock: failed to find our lock in the node children (connection reset?)")
      raise LockError.new("failed to find our lock in the node children (connection reset?)")
    elsif lock_idx == 0  
      logger.debug "lock acquired (client id - #{zk.client_id}), lock - #{lock}"
      @owner = true
      return true
    else
      logger.debug "Distlock::ZK::ExclusiveLock#_get_lock: lock contention for #{lock} - #{children.inspect} (my client id - #{zk.client_id})"
      logger.info "Distlock::ZK::ExclusiveLock#_get_lock: lock contention - #{lock}"

      to_watch = "#{path}/#{children[lock_idx-1]}"
      logger.debug "about to set watch on - #{to_watch}"

      # 2-step process so we minimise the chance of setting watches on the node if it does not exist for any reason
      @watcher_called=false
      @watcher = Zookeeper::WatcherCallback.new { do_watcher(@watcher, lock) }
      resp = zk.stat(:path => to_watch)
      resp = zk.stat(:path => to_watch, :watcher => @watcher) if resp[:stat].exists

      if resp[:stat].exists
        logger.info "Distlock::ZK::ExclusiveLock#_get_lock: watcher set, node exists, watching - #{to_watch}, our lock - #{lock}"
        start_time = Time.now
        while !@watcher_called
          sleep 0.1

          if (start_time + WATCHER_TIMEOUT) < Time.now
            logger.error("Distlock::ZK::ExclusiveLock#_get_lock: timed out while watching - #{to_watch}, our lock - #{lock}, closing session and bombing out")
            close
            raise LockError.new("Distlock::ZK::ExclusiveLock#_get_lock timed out while waiting for watcher")
          end
        end
      else
        logger.error("Distlock::ZK::ExclusiveLock#_get_lock: node we are watching does not exist, closing session, lock - #{lock}")
        close
        raise LockError.new("node we tried to watch does not exist")
      end
    end
  end
end

#check_for_existing_lock(path) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/distlock/zk/exclusive_lock.rb', line 40

def check_for_existing_lock(path)
  children = zk.get_children(:path => path)[:children].sort{|a,b|a.split('-').last <=> b.split('-').last}
  children.detect do |child| 
    logger.debug "checking existing lock for our client_id - #{child} vs. #{zk.client_id}"
    if child.split('-')[1] == "#{zk.client_id}"
      logger.debug "found existing lock for client_id #{zk.client_id}, lock - #{child}, reusing"
      return "#{path}/#{child}"
    end
  end
end

#do_watcher(watcher, lock) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/distlock/zk/exclusive_lock.rb', line 21

def do_watcher(watcher, lock)
  if watcher.type == ZookeeperConstants::ZOO_DELETED_EVENT
    logger.debug "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for delete of node - #{lock}"
  else
    if watcher.type == ZookeeperConstants::ZOO_CREATED_EVENT
      logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for creation of node (should never happen for an ephemeral node?) - #{lock}"
    elsif watcher.type == ZookeeperConstants::ZOO_SESSION_EVENT
      logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for zoo session event, closing the session for this client - #{lock}"
    else
      logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for unexpected event, closing the session for this client - #{lock}, event - #{watcher.type}"
    end

    close
    raise LockError.new("Distlock::ZK::ExclusiveLock#do_watcher: got an unexpected watcher type - #{watcher.type}")
  end

  @watcher_called=true
end

#lock(path) ⇒ Object

Raises:



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/distlock/zk/exclusive_lock.rb', line 51

def lock(path)
  raise LockError.new("invalid lock path, must start with '/'") unless path.start_with?("/")
  @owner = false
  
  safe_create(path)
  
  # TODO - combine these into a single method like find_or_create
  lock = check_for_existing_lock(path)
  lock = create_sequenced_ephemeral(path) unless lock
  
  logger.debug "my lock path - #{lock}"
  @my_lock = lock
  result = _get_lock(lock)
  logger.info("Distlock::ZK::ExclusiveLock#lock: lock acquired - #{lock}")
  
  result
end

#my_lockObject



17
18
19
# File 'lib/distlock/zk/exclusive_lock.rb', line 17

def my_lock
  @my_lock
end

#owner?Boolean

Returns:

  • (Boolean)


13
14
15
# File 'lib/distlock/zk/exclusive_lock.rb', line 13

def owner?
  @owner
end

#unlock(lock = @my_lock) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/distlock/zk/exclusive_lock.rb', line 69

def unlock(lock = @my_lock)
  return unless lock

  logger.debug "unlocking - #{lock}"
  
  zk.delete(:path => lock)
  
  if lock == @my_lock
    @my_lock = nil
    @owner = false
  end
end

#with_lock(path = "/distlock/zk/exclusive_lock/default") ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/distlock/zk/exclusive_lock.rb', line 136

def with_lock(path="/distlock/zk/exclusive_lock/default")                            
  begin
    lock(path)
    yield if block_given?
  rescue ZookeeperExceptions::ZookeeperException::SessionExpired => e
    close
    raise LockError.new("error encountered while attempting to obtain lock - #{e}, zookeeper session has been closed")
  rescue ZookeeperExceptions::ZookeeperException => e
    raise LockError.new("error encountered while attempting to obtain lock - #{e}")
  ensure
    begin
      unlock
    rescue ZookeeperExceptions::ZookeeperException => e
      logger.error("Distlock::ZK::ExclusiveLock#with_lock: error while unlocking - #{e}, closing session to clean up our lock")
      close
    end
  end
end