Class: Consulkit::SemaphoreCoordinator
- Inherits:
-
Object
- Object
- Consulkit::SemaphoreCoordinator
- Defined in:
- lib/consulkit/semaphore_coordinator.rb
Overview
Coordinates the usage of a consul key as distributed semaphore, following the algorithm described by Hashicorp [here](developer.hashicorp.com/consul/tutorials/developer-configuration/distributed-semaphore).
Instance Method Summary collapse
-
#acquire(session_id, limit, backoff_cap: 10.0, timeout: nil) ⇒ Boolean
Continually attempts to acquire the semaphore until successful using exponential backoff.
-
#initialize(client, key_prefix, logger = nil) ⇒ SemaphoreCoordinator
constructor
Initializes the coordinater using the given Consulkit client and key prefix.
-
#release(session_id, backoff_cap: 10.0, timeout: nil) ⇒ Boolean
Continually attempts to release the semaphore until successful using exponential backoff.
-
#try_acquire(session_id, limit) ⇒ Boolean
Attempts to acquire the semaphore, allowing up to the given limit of holders.
-
#try_release(session_id) ⇒ Boolean
Attempts to release the semaphore.
Constructor Details
#initialize(client, key_prefix, logger = nil) ⇒ SemaphoreCoordinator
Initializes the coordinater using the given Consulkit client and key prefix.
13 14 15 16 17 |
# File 'lib/consulkit/semaphore_coordinator.rb', line 13 def initialize(client, key_prefix, logger = nil) @client = client @key_prefix = key_prefix @logger = logger || Logger.new(IO::NULL) end |
Instance Method Details
#acquire(session_id, limit, backoff_cap: 10.0, timeout: nil) ⇒ Boolean
Continually attempts to acquire the semaphore until successful using exponential backoff.
29 30 31 32 33 34 35 |
# File 'lib/consulkit/semaphore_coordinator.rb', line 29 def acquire(session_id, limit, backoff_cap: 10.0, timeout: nil) exponential_backoff(backoff_cap, timeout) do @logger.info(%(calling try_acquire("#{session_id}", #{limit}))) try_acquire(session_id, limit) end end |
#release(session_id, backoff_cap: 10.0, timeout: nil) ⇒ Boolean
Continually attempts to release the semaphore until successful using exponential backoff.
44 45 46 47 48 49 50 |
# File 'lib/consulkit/semaphore_coordinator.rb', line 44 def release(session_id, backoff_cap: 10.0, timeout: nil) exponential_backoff(backoff_cap, timeout) do @logger.info(%(calling try_release("#{session_id}"))) try_release(session_id) end end |
#try_acquire(session_id, limit) ⇒ Boolean
Attempts to acquire the semaphore, allowing up to the given limit of holders.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/consulkit/semaphore_coordinator.rb', line 60 def try_acquire(session_id, limit) raise ArgumentError, 'semaphore limit must be at least 1' if limit < 1 read! return true if @holders.include? session_id unless @contenders.include? session_id return false unless @client.kv_acquire_lock("#{@key_prefix}/#{session_id}", session_id) @contenders << session_id end return false if @holders.size >= limit @logger.info("semaphore has less than #{limit} holders, attempting to grab") write_coordination_key(@holders.dup.add(session_id)) end |
#try_release(session_id) ⇒ Boolean
Attempts to release the semaphore.
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/consulkit/semaphore_coordinator.rb', line 85 def try_release(session_id) read! return true unless @holders.include? session_id if @contenders.include? session_id @client.kv_delete("#{@key_prefix}/#{session_id}") @contenders.delete(session_id) end write_coordination_key(@holders.dup.delete(session_id)) end |