Class: GCSLock::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/gcslock/semaphore.rb

Instance Method Summary collapse

Constructor Details

#initialize(bucket, object, count, client: nil, uuid: nil, min_backoff: nil, max_backoff: nil) ⇒ Semaphore

Returns a new instance of Semaphore.


10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/gcslock/semaphore.rb', line 10

def initialize(bucket, object, count, client: nil, uuid: nil, min_backoff: nil, max_backoff: nil)
  @client = client || Google::Cloud::Storage.new
  @bucket = bucket
  @object = object
  @count = count

  @uuid = uuid || SecureRandom.uuid
  @min_backoff = min_backoff || 0.01
  @max_backoff = max_backoff || 5.0

  @permits = []
end

Instance Method Details

#acquire(permits: 1, timeout: nil, permits_to_check: nil) ⇒ Boolean

Attempts to grab permits and waits if it isn't available.

Parameters:

  • permits (Integer) (defaults to: 1)

    the number of permits to acquire

  • timeout (Integer) (defaults to: nil)

    the duration to wait before cancelling the operation if the lock was not obtained (unlimited if nil).

  • permits_to_check (Integer) (defaults to: nil)

    the number of permits to check for acquisition until the required number of permits is secured for each iteration (defaults to nil, all permits if nil)

Returns:

  • (Boolean)

    `true` if the lock was obtained.

Raises:


36
37
38
39
40
41
42
43
44
# File 'lib/gcslock/semaphore.rb', line 36

def acquire(permits: 1, timeout: nil, permits_to_check: nil)
  begin
    Utils.backoff(min_backoff: @min_backoff, max_backoff: @max_backoff, timeout: timeout) do
      try_acquire(permits: permits, permits_to_check: permits_to_check)
    end
  rescue LockTimeoutError
    raise LockTimeoutError, "Unable to get semaphore permit for #{@object} before timeout"
  end
end

#available_permitsInteger

Returns the current number of permits available for this semaphore.

Returns:

  • (Integer)

    The number of permits available


134
135
136
137
138
139
# File 'lib/gcslock/semaphore.rb', line 134

def available_permits
  mutexes = @count.times.map { |index| mutex_object(index: index) }
  mutexes.select! { |mutex| !mutex.locked? }

  mutexes.size
end

#drain_permitsInteger

Acquires and returns all permits that are immediately available.

Returns:

  • (Integer)

    The number of permits acquired


122
123
124
125
126
127
128
129
# File 'lib/gcslock/semaphore.rb', line 122

def drain_permits
  mutexes = @count.times.map { |index| mutex_object(index: index) }
  mutexes.select! { |mutex| mutex.try_lock }

  @permits.push(*mutexes)

  mutexes.size
end

#owned_permitsInteger

Returns the current number of permits owned by this process for this semaphore.

Returns:

  • (Integer)

    The number of permits owned by this process


144
145
146
147
# File 'lib/gcslock/semaphore.rb', line 144

def owned_permits
  @permits.select! { |mutex| mutex.owned? }
  @permits.size
end

#release(permits: 1) ⇒ Object

Releases the given number of permits.

Parameters:

  • permits (Integer) (defaults to: 1)

    the number of permits to acquire

Returns:

  • nil

Raises:


80
81
82
83
84
85
86
87
88
# File 'lib/gcslock/semaphore.rb', line 80

def release(permits: 1)
  permits.times do
    raise LockNotOwnedError, "No semaphore for #{@object} is owned by this process" unless @permits&.any?

    @permits.pop.unlock
  end

  nil
end

#release_allObject

Releases all of the owned permits.

Returns:

  • nil

Raises:


95
96
97
98
99
100
101
# File 'lib/gcslock/semaphore.rb', line 95

def release_all
  while @permits&.any?
    @permits.pop.unlock
  end

  nil
end

#release_all!Object

Force releases all of the permits in the semaphore, even if not owned.

Returns:

  • nil


106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/gcslock/semaphore.rb', line 106

def release_all!
  mutexes = @count.times.map { |index| mutex_object(index: index) }
  mutexes.each do |mut|
    mut.unlock!
  rescue LockNotFoundError
    nil
  end

  @permits = []

  nil
end

#try_acquire(permits: 1, permits_to_check: nil) ⇒ Boolean

Attempts to obtain a permit and returns immediately.

Parameters:

  • permits (Integer) (defaults to: 1)

    the number of permits to acquire

  • permits_to_check (Integer) (defaults to: nil)

    the number of permits to check for acquisition until the required number of permits is secured (defaults to nil, all permits if nil)

Returns:

  • (Boolean)

    `true` if the requested number of permits was granted.


53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/gcslock/semaphore.rb', line 53

def try_acquire(permits: 1, permits_to_check: nil)
  acquired = []

  @count.times.to_a.sample(permits_to_check || @count).each do |index|
    mutex = mutex_object(index: index)
    if mutex.try_lock
      acquired.push(mutex)
      break if acquired.size == permits
    end
  end

  if acquired.size < permits
    acquired.each { |mutex| mutex.unlock }
    return false
  end

  @permits.push(*acquired)
  true
end