Class: Distlock::ZK::ExclusiveLock
- Inherits:
-
Object
- Object
- Distlock::ZK::ExclusiveLock
show all
- Includes:
- Common
- Defined in:
- lib/distlock/zk/exclusive_lock.rb
Constant Summary
collapse
- WATCHER_TIMEOUT =
60
Instance Method Summary
collapse
Methods included from Common
#close, #create_sequenced_ephemeral, #exists?, #logger, #logger=, #safe_create, #zk
Constructor Details
#initialize(options = {}) ⇒ ExclusiveLock
Returns a new instance of ExclusiveLock.
8
9
10
11
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 8
def initialize(options={})
defaults = {:host => "localhost:2181", :timeout => 10, :root_path => "/lock/exclusive/default"}
@options = defaults.merge(options)
end
|
Instance Method Details
#_get_lock(lock) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 82
def _get_lock(lock)
logger.debug "_get_lock: entered for #{lock}"
while !@owner
path = lock.split('/')[0...-1].join('/')
children = zk.get_children(:path => path)[:children].sort{|a,b|a.split('-').last <=> b.split('-').last}
lock_last = lock.split('/').last
lock_idx = children.index(lock_last)
if lock_idx.nil?
logger.error("Distlock::ZK::ExclusiveLock#_get_lock: failed to find our lock in the node children (connection reset?)")
raise LockError.new("failed to find our lock in the node children (connection reset?)")
elsif lock_idx == 0
logger.debug "lock acquired (client id - #{zk.client_id}), lock - #{lock}"
@owner = true
return true
else
logger.debug "Distlock::ZK::ExclusiveLock#_get_lock: lock contention for #{lock} - #{children.inspect} (my client id - #{zk.client_id})"
logger.info "Distlock::ZK::ExclusiveLock#_get_lock: lock contention - #{lock}"
to_watch = "#{path}/#{children[lock_idx-1]}"
logger.debug "about to set watch on - #{to_watch}"
@watcher_called=false
@watcher = Zookeeper::WatcherCallback.new { do_watcher(@watcher, lock) }
resp = zk.stat(:path => to_watch)
resp = zk.stat(:path => to_watch, :watcher => @watcher) if resp[:stat].exists
if resp[:stat].exists
logger.info "Distlock::ZK::ExclusiveLock#_get_lock: watcher set, node exists, watching - #{to_watch}, our lock - #{lock}"
start_time = Time.now
while !@watcher_called
sleep 0.1
if (start_time + WATCHER_TIMEOUT) < Time.now
logger.error("Distlock::ZK::ExclusiveLock#_get_lock: timed out while watching - #{to_watch}, our lock - #{lock}, closing session and bombing out")
close
raise LockError.new("Distlock::ZK::ExclusiveLock#_get_lock timed out while waiting for watcher")
end
end
else
logger.error("Distlock::ZK::ExclusiveLock#_get_lock: node we are watching does not exist, closing session, lock - #{lock}")
close
raise LockError.new("node we tried to watch does not exist")
end
end
end
end
|
#check_for_existing_lock(path) ⇒ Object
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 40
def check_for_existing_lock(path)
children = zk.get_children(:path => path)[:children].sort{|a,b|a.split('-').last <=> b.split('-').last}
children.detect do |child|
logger.debug "checking existing lock for our client_id - #{child} vs. #{zk.client_id}"
if child.split('-')[1] == "#{zk.client_id}"
logger.debug "found existing lock for client_id #{zk.client_id}, lock - #{child}, reusing"
return "#{path}/#{child}"
end
end
end
|
#do_watcher(watcher, lock) ⇒ Object
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 21
def do_watcher(watcher, lock)
if watcher.type == ZookeeperConstants::ZOO_DELETED_EVENT
logger.debug "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for delete of node - #{lock}"
else
if watcher.type == ZookeeperConstants::ZOO_CREATED_EVENT
logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for creation of node (should never happen for an ephemeral node?) - #{lock}"
elsif watcher.type == ZookeeperConstants::ZOO_SESSION_EVENT
logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for zoo session event, closing the session for this client - #{lock}"
else
logger.error "Distlock::ZK::ExclusiveLock#do_watcher: watcher called for unexpected event, closing the session for this client - #{lock}, event - #{watcher.type}"
end
close
raise LockError.new("Distlock::ZK::ExclusiveLock#do_watcher: got an unexpected watcher type - #{watcher.type}")
end
@watcher_called=true
end
|
#lock(path) ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 51
def lock(path)
raise LockError.new("invalid lock path, must start with '/'") unless path.start_with?("/")
@owner = false
safe_create(path)
lock = check_for_existing_lock(path)
lock = create_sequenced_ephemeral(path) unless lock
logger.debug "my lock path - #{lock}"
@my_lock = lock
result = _get_lock(lock)
logger.info("Distlock::ZK::ExclusiveLock#lock: lock acquired - #{lock}")
result
end
|
#my_lock ⇒ Object
17
18
19
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 17
def my_lock
@my_lock
end
|
#owner? ⇒ Boolean
13
14
15
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 13
def owner?
@owner
end
|
#unlock(lock = @my_lock) ⇒ Object
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 69
def unlock(lock = @my_lock)
return unless lock
logger.debug "unlocking - #{lock}"
zk.delete(:path => lock)
if lock == @my_lock
@my_lock = nil
@owner = false
end
end
|
#with_lock(path = "/distlock/zk/exclusive_lock/default") ⇒ Object
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
# File 'lib/distlock/zk/exclusive_lock.rb', line 136
def with_lock(path="/distlock/zk/exclusive_lock/default")
begin
lock(path)
yield if block_given?
rescue ZookeeperExceptions::ZookeeperException::SessionExpired => e
close
raise LockError.new("error encountered while attempting to obtain lock - #{e}, zookeeper session has been closed")
rescue ZookeeperExceptions::ZookeeperException => e
raise LockError.new("error encountered while attempting to obtain lock - #{e}")
ensure
begin
unlock
rescue ZookeeperExceptions::ZookeeperException => e
logger.error("Distlock::ZK::ExclusiveLock#with_lock: error while unlocking - #{e}, closing session to clean up our lock")
close
end
end
end
|