Class: DaemonRunner::Semaphore
- Inherits:
-
Object
- Object
- DaemonRunner::Semaphore
- Extended by:
- Logger
- Includes:
- Logger
- Defined in:
- lib/daemon_runner/semaphore.rb
Overview
Manage semaphore locks with Consul
Instance Attribute Summary collapse
-
#limit ⇒ Object
readonly
The number of nodes that can obtain a semaphore lock.
-
#lock_content ⇒ Object
readonly
The lock content.
-
#lock_modify_index ⇒ Object
readonly
The current lock modify index.
-
#members ⇒ Object
readonly
The current semaphore members.
-
#prefix ⇒ Object
readonly
The Consul key prefix.
-
#session ⇒ Object
readonly
The Consul session.
-
#state ⇒ Object
readonly
The current state of the semaphore.
Class Method Summary collapse
-
.lock(name, limit = 3, **options) ⇒ DaemonRunner::Semaphore
Acquire a lock with the current session.
Instance Method Summary collapse
-
#contender_key(value = 'none') ⇒ Object
Create a contender key.
- #create_session(name) ⇒ Object
-
#initialize(name:, prefix: nil, lock: nil, limit: 3) ⇒ Semaphore
constructor
A new instance of Semaphore.
-
#lock ⇒ Boolean
Obtain a lock with the current session.
-
#locked? ⇒ Boolean
Check if the semaphore holds the lock.
-
#release ⇒ Boolean
Release a lock with the current session.
-
#renew ⇒ Thread
Renew lock watching for changes.
-
#renew? ⇒ Boolean
Start a blocking query on the prefix, if there are changes we need to try to obtain the lock again.
-
#semaphore_state ⇒ Object
Get the current semaphore state by fetching all conterder keys and the lock key.
- #set_limit(new_limit) ⇒ Object
- #try_lock ⇒ Object
- #try_release ⇒ Object
-
#write_lock ⇒ Boolean
Write a new lock file if the number of contenders is less than ‘limit`.
Methods included from Logger
Constructor Details
#initialize(name:, prefix: nil, lock: nil, limit: 3) ⇒ Semaphore
Returns a new instance of Semaphore.
67 68 69 70 71 72 73 74 75 76 |
# File 'lib/daemon_runner/semaphore.rb', line 67 def initialize(name:, prefix: nil, lock: nil, limit: 3) create_session(name) @prefix = prefix.nil? ? "service/#{name}/lock/" : prefix @prefix += '/' unless @prefix.end_with?('/') @lock = lock.nil? ? "#{@prefix}.lock" : lock @lock_modify_index = nil @lock_content = nil @limit = set_limit(limit) @reset = false end |
Instance Attribute Details
#limit ⇒ Object (readonly)
The number of nodes that can obtain a semaphore lock
62 63 64 |
# File 'lib/daemon_runner/semaphore.rb', line 62 def limit @limit end |
#lock_content ⇒ Object (readonly)
The lock content
56 57 58 |
# File 'lib/daemon_runner/semaphore.rb', line 56 def lock_content @lock_content end |
#lock_modify_index ⇒ Object (readonly)
The current lock modify index
53 54 55 |
# File 'lib/daemon_runner/semaphore.rb', line 53 def lock_modify_index @lock_modify_index end |
#members ⇒ Object (readonly)
The current semaphore members
50 51 52 |
# File 'lib/daemon_runner/semaphore.rb', line 50 def members @members end |
#prefix ⇒ Object (readonly)
The Consul key prefix
59 60 61 |
# File 'lib/daemon_runner/semaphore.rb', line 59 def prefix @prefix end |
#session ⇒ Object (readonly)
The Consul session
44 45 46 |
# File 'lib/daemon_runner/semaphore.rb', line 44 def session @session end |
#state ⇒ Object (readonly)
The current state of the semaphore
47 48 49 |
# File 'lib/daemon_runner/semaphore.rb', line 47 def state @state end |
Class Method Details
.lock(name, limit = 3, **options) ⇒ DaemonRunner::Semaphore
Acquire a lock with the current session
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/daemon_runner/semaphore.rb', line 17 def lock(name, limit = 3, **) .merge!(name: name) .merge!(limit: limit) semaphore = Semaphore.new() semaphore.lock if block_given? begin until semaphore.locked? semaphore.try_lock sleep 0.1 end lock_thr = semaphore.renew yield ensure lock_thr.kill unless lock_thr.nil? semaphore.release end end semaphore rescue Exception => e logger.error e logger.debug e.backtrace.join("\n") raise end |
Instance Method Details
#contender_key(value = 'none') ⇒ Object
Create a contender key
139 140 141 142 143 144 145 146 147 148 |
# File 'lib/daemon_runner/semaphore.rb', line 139 def contender_key(value = 'none') if value.nil? || value.empty? raise ArgumentError, 'Value cannot be empty or nil' end key = "#{prefix}/#{session.id}" ::DaemonRunner::RetryErrors.retry do @contender_key = Diplomat::Lock.acquire(key, session.id, value) end @contender_key end |
#create_session(name) ⇒ Object
121 122 123 124 125 |
# File 'lib/daemon_runner/semaphore.rb', line 121 def create_session(name) ::DaemonRunner::RetryErrors.retry(exceptions: [DaemonRunner::Session::CreateSessionError]) do @session = Session.start(name, behavior: 'delete') end end |
#lock ⇒ Boolean
Obtain a lock with the current session
82 83 84 85 86 |
# File 'lib/daemon_runner/semaphore.rb', line 82 def lock contender_key semaphore_state try_lock end |
#locked? ⇒ Boolean
Check if the semaphore holds the lock
91 92 93 94 |
# File 'lib/daemon_runner/semaphore.rb', line 91 def locked? semaphore_state lock_exists? && (lock_content['Holders'] || []).include?(session.id) end |
#release ⇒ Boolean
Release a lock with the current session
115 116 117 118 |
# File 'lib/daemon_runner/semaphore.rb', line 115 def release semaphore_state try_release end |
#renew ⇒ Thread
Renew lock watching for changes
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/daemon_runner/semaphore.rb', line 99 def renew thr = Thread.new do loop do if renew? semaphore_state try_lock end end end thr end |
#renew? ⇒ Boolean
Start a blocking query on the prefix, if there are changes we need to try to obtain the lock again.
195 196 197 198 199 200 201 202 203 |
# File 'lib/daemon_runner/semaphore.rb', line 195 def renew? logger.debug("Watching Consul #{prefix} for changes") = { recurse: true } changes = Diplomat::Kv.get(prefix, , :wait, :wait) logger.info("Changes on #{prefix} detected") if changes changes rescue StandardError => e logger.error(e) end |
#semaphore_state ⇒ Object
Get the current semaphore state by fetching all conterder keys and the lock key
152 153 154 155 156 157 |
# File 'lib/daemon_runner/semaphore.rb', line 152 def semaphore_state = { decode_values: true, recurse: true } @state = Diplomat::Kv.get(prefix, , :return) decode_semaphore_state unless state.empty? state end |
#set_limit(new_limit) ⇒ Object
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/daemon_runner/semaphore.rb', line 127 def set_limit(new_limit) if lock_exists? if new_limit.to_i != @limit.to_i logger.warn 'Limit in lockfile and @limit do not match using limit from lockfile' end @limit = lock_content['Limit'] else @limit = new_limit end end |
#try_lock ⇒ Object
159 160 161 162 163 164 165 166 167 168 |
# File 'lib/daemon_runner/semaphore.rb', line 159 def try_lock prune_members do_update = add_self_to_holders @reset = false if do_update format_holders @locked = write_lock end log_lock_state end |
#try_release ⇒ Object
170 171 172 173 174 175 176 177 178 179 |
# File 'lib/daemon_runner/semaphore.rb', line 170 def try_release do_update = remove_self_from_holders if do_update format_holders @locked = !write_lock end DaemonRunner::Session.release(prefix) session.destroy! log_release_state end |
#write_lock ⇒ Boolean
Write a new lock file if the number of contenders is less than ‘limit`
183 184 185 186 187 188 |
# File 'lib/daemon_runner/semaphore.rb', line 183 def write_lock index = lock_modify_index.nil? ? 0 : lock_modify_index value = generate_lockfile return true if value == true Diplomat::Kv.put(@lock, value, cas: index) end |