Class: GCSLock::Mutex

Inherits:
Object
  • Object
show all
Defined in:
lib/gcslock/mutex.rb

Instance Method Summary collapse

Constructor Details

#initialize(bucket, object, client: nil, uuid: nil, min_backoff: nil, max_backoff: nil) ⇒ Mutex

Returns a new instance of Mutex.



8
9
10
11
12
13
14
15
16
# File 'lib/gcslock/mutex.rb', line 8

def initialize(bucket, object, client: nil, uuid: nil, min_backoff: nil, max_backoff: nil)
  @client = client || Google::Cloud::Storage.new
  @bucket = @client.bucket(bucket, skip_lookup: true)
  @object = @bucket.file(object, skip_lookup: true)

  @uuid = uuid || SecureRandom.uuid
  @min_backoff = min_backoff || 0.01
  @max_backoff = max_backoff || 5.0
end

Instance Method Details

#lock(timeout: nil) ⇒ Object

Attempts to grab the lock and waits if it isn’t available. Raises ThreadError if mutex was locked by the current thread.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/gcslock/mutex.rb', line 20

def lock(timeout: nil)
  raise LockAlreadyOwnedError, "Mutex for #{@object.name} is already owned by this process" if owned?

  backoff = @min_backoff
  waited = 0.0 unless timeout.nil?

  loop do
    return true if try_lock
    break if !timeout.nil? && waited + backoff > timeout
    sleep(backoff)

    backoff_opts = [@max_backoff, backoff * 2]

    unless timeout.nil?
      waited += backoff
      backoff_opts.push(timeout - waited) if timeout > waited
    end

    backoff = backoff_opts.min
  end

  raise LockTimeoutError, "Unable to get mutex for #{@object.name} before timeout"
end

#locked?Boolean

Returns true if this lock is currently held by some thread.

Returns:

  • (Boolean)


45
46
47
48
# File 'lib/gcslock/mutex.rb', line 45

def locked?
  @object.reload!
  @object.exists?
end

#owned?Boolean

Returns true if this lock is currently held by current thread.

Returns:

  • (Boolean)


51
52
53
# File 'lib/gcslock/mutex.rb', line 51

def owned?
  locked? && @object.size == @uuid.size && @object.download.read == @uuid
end

#synchronize(timeout: nil) ⇒ Object

Obtains a lock, runs the block, and releases the lock when the block completes. Raises LockAlreadyOwnedError if the lock is already owned by the current instance.



57
58
59
60
61
62
63
64
65
66
# File 'lib/gcslock/mutex.rb', line 57

def synchronize(timeout: nil)
  raise LockAlreadyOwnedError, "Mutex for #{@object.name} is already owned by this process" if owned?

  lock(timeout: timeout)
  begin
    yield
  ensure
    unlock
  end
end

#try_lockObject

Attempts to obtain the lock and returns immediately. Returns true if the lock was granted.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/gcslock/mutex.rb', line 69

def try_lock
  @client.service.service.insert_object(
    @bucket.name,
    name: @object.name,
    if_generation_match: 0,
    upload_source: StringIO.new(@uuid),
  )

  true
rescue Google::Apis::ClientError => e
  raise unless e.status_code == 412 && e.message.start_with?('conditionNotMet:')

  false
end

#unlockObject

Releases the lock. Raises LockNotOwnedError if the lock is not owned by the current instance.

Raises:



85
86
87
88
# File 'lib/gcslock/mutex.rb', line 85

def unlock
  raise LockNotOwnedError, "Mutex for #{@object.name} is not owned by this process" unless owned?
  @object.delete
end

#unlock!Object

Releases the lock even if not owned by this instance. Raises LockNotFoundError if the lock cannot be found.



91
92
93
94
95
# File 'lib/gcslock/mutex.rb', line 91

def unlock!
  @object.delete
rescue Google::Cloud::NotFoundError => e
  raise LockNotFoundError, "Mutex for #{@object.name} not found"
end