Class: Consulkit::SemaphoreCoordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/consulkit/semaphore_coordinator.rb

Overview

Coordinates the usage of a consul key as distributed semaphore, following the algorithm described by Hashicorp [here](developer.hashicorp.com/consul/tutorials/developer-configuration/distributed-semaphore).

Instance Method Summary collapse

Constructor Details

#initialize(client, key_prefix, logger = nil) ⇒ SemaphoreCoordinator

Initializes the coordinater using the given Consulkit client and key prefix.

Parameters:

  • client (Consulkit::Client)

    the client to use.

  • key_prefix (String)

    the key_prefix to use.

  • logger (Logger) (defaults to: nil)

    if non-nil, will log during semaphore operations.



13
14
15
16
17
# File 'lib/consulkit/semaphore_coordinator.rb', line 13

def initialize(client, key_prefix, logger = nil)
  @client     = client
  @key_prefix = key_prefix
  @logger     = logger || Logger.new(IO::NULL)
end

Instance Method Details

#acquire(session_id, limit, backoff_cap: 10.0, timeout: nil) ⇒ Boolean

Continually attempts to acquire the semaphore until successful using exponential backoff.

Parameters:

  • session_id (String)

    the ID of a consul session to associate with the semaphore.

  • limit (Integer)

    the maximum number of holders for the semaphore.

  • backoff_cap (Float) (defaults to: 10.0)

    the maximum interval to wait between attempts.

  • timeout (Float) (defaults to: nil)

    the maximum number of seconds to sleep before giving up; nil means try forever.

Returns:

  • (Boolean)

See Also:

  • Client.session_create


29
30
31
32
33
34
35
# File 'lib/consulkit/semaphore_coordinator.rb', line 29

def acquire(session_id, limit, backoff_cap: 10.0, timeout: nil)
  exponential_backoff(backoff_cap, timeout) do
    @logger.info(%(calling try_acquire("#{session_id}", #{limit})))

    try_acquire(session_id, limit)
  end
end

#release(session_id, backoff_cap: 10.0, timeout: nil) ⇒ Boolean

Continually attempts to release the semaphore until successful using exponential backoff.

Parameters:

  • session_id (String)

    the ID of a consul session to associate with the semaphore.

  • backoff_cap (Float) (defaults to: 10.0)

    the maximum interval to wait between attempts.

  • timeout (Float) (defaults to: nil)

    the maximum number of seconds to sleep before giving up; nil means try forever.

Returns:

  • (Boolean)


44
45
46
47
48
49
50
# File 'lib/consulkit/semaphore_coordinator.rb', line 44

def release(session_id, backoff_cap: 10.0, timeout: nil)
  exponential_backoff(backoff_cap, timeout) do
    @logger.info(%(calling try_release("#{session_id}")))

    try_release(session_id)
  end
end

#try_acquire(session_id, limit) ⇒ Boolean

Attempts to acquire the semaphore, allowing up to the given limit of holders.

Parameters:

  • session_id (String)

    the ID of a consul session to associate with the semaphore.

  • limit (Integer)

    the maximum number of holders for the semaphore.

Returns:

  • (Boolean)

Raises:

  • (ArgumentError)

See Also:

  • Client.session_create


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/consulkit/semaphore_coordinator.rb', line 60

def try_acquire(session_id, limit)
  raise ArgumentError, 'semaphore limit must be at least 1' if limit < 1

  read!

  return true if @holders.include? session_id

  unless @contenders.include? session_id
    return false unless @client.kv_acquire_lock("#{@key_prefix}/#{session_id}", session_id)

    @contenders << session_id
  end

  return false if @holders.size >= limit

  @logger.info("semaphore has less than #{limit} holders, attempting to grab")

  write_coordination_key(@holders.dup.add(session_id))
end

#try_release(session_id) ⇒ Boolean

Attempts to release the semaphore.

Parameters:

  • session_id (String)

    the ID of a consul session to associate with the semaphore.

Returns:

  • (Boolean)


85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/consulkit/semaphore_coordinator.rb', line 85

def try_release(session_id)
  read!

  return true unless @holders.include? session_id

  if @contenders.include? session_id
    @client.kv_delete("#{@key_prefix}/#{session_id}")
    @contenders.delete(session_id)
  end

  write_coordination_key(@holders.dup.delete(session_id))
end