Class: DaemonRunner::Semaphore

Inherits:
Object
  • Object
show all
Extended by:
Logger
Includes:
Logger
Defined in:
lib/daemon_runner/semaphore.rb

Overview

Manage semaphore locks with Consul

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

logger, logger_name

Constructor Details

#initialize(name:, prefix: nil, lock: nil, limit: 3) ⇒ Semaphore

Returns a new instance of Semaphore.

Parameters:

  • name (String)

    The name of the session, it is also used in the ‘prefix`

  • prefix (String|NilClass) (defaults to: nil)

    The Consul Kv prefix

  • lock (String|NilClass) (defaults to: nil)

    The path to the lock file



67
68
69
70
71
72
73
74
75
76
# File 'lib/daemon_runner/semaphore.rb', line 67

def initialize(name:, prefix: nil, lock: nil, limit: 3)
  create_session(name)
  @prefix = prefix.nil? ? "service/#{name}/lock/" : prefix
  @prefix += '/' unless @prefix.end_with?('/')
  @lock = lock.nil? ? "#{@prefix}.lock" : lock
  @lock_modify_index = nil
  @lock_content = nil
  @limit = set_limit(limit)
  @reset = false
end

Instance Attribute Details

#limitObject (readonly)

The number of nodes that can obtain a semaphore lock



62
63
64
# File 'lib/daemon_runner/semaphore.rb', line 62

def limit
  @limit
end

#lock_contentObject (readonly)

The lock content



56
57
58
# File 'lib/daemon_runner/semaphore.rb', line 56

def lock_content
  @lock_content
end

#lock_modify_indexObject (readonly)

The current lock modify index



53
54
55
# File 'lib/daemon_runner/semaphore.rb', line 53

def lock_modify_index
  @lock_modify_index
end

#membersObject (readonly)

The current semaphore members



50
51
52
# File 'lib/daemon_runner/semaphore.rb', line 50

def members
  @members
end

#prefixObject (readonly)

The Consul key prefix



59
60
61
# File 'lib/daemon_runner/semaphore.rb', line 59

def prefix
  @prefix
end

#sessionObject (readonly)

The Consul session



44
45
46
# File 'lib/daemon_runner/semaphore.rb', line 44

def session
  @session
end

#stateObject (readonly)

The current state of the semaphore



47
48
49
# File 'lib/daemon_runner/semaphore.rb', line 47

def state
  @state
end

Class Method Details

.lock(name, limit = 3, **options) ⇒ DaemonRunner::Semaphore

Acquire a lock with the current session

Parameters:

  • limit (Integer) (defaults to: 3)

    The number of nodes that can request the lock

Returns:

See Also:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/daemon_runner/semaphore.rb', line 17

def lock(name, limit = 3, **options)
  options.merge!(name: name)
  options.merge!(limit: limit)
  semaphore = Semaphore.new(options)
  semaphore.lock
  if block_given?
    begin
      until semaphore.locked?
        semaphore.try_lock
        sleep 0.1
      end
      lock_thr = semaphore.renew
      yield
    ensure
      lock_thr.kill unless lock_thr.nil?
      semaphore.release
    end
  end
  semaphore
rescue Exception => e
  logger.error e
  logger.debug e.backtrace.join("\n")
  raise
end

Instance Method Details

#contender_key(value = 'none') ⇒ Object

Create a contender key



139
140
141
142
143
144
145
146
147
148
# File 'lib/daemon_runner/semaphore.rb', line 139

def contender_key(value = 'none')
  if value.nil? || value.empty?
    raise ArgumentError, 'Value cannot be empty or nil'
  end
  key = "#{prefix}/#{session.id}"
  ::DaemonRunner::RetryErrors.retry do
    @contender_key = Diplomat::Lock.acquire(key, session.id, value)
  end
  @contender_key
end

#create_session(name) ⇒ Object



121
122
123
124
125
# File 'lib/daemon_runner/semaphore.rb', line 121

def create_session(name)
  ::DaemonRunner::RetryErrors.retry(exceptions: [DaemonRunner::Session::CreateSessionError]) do
    @session = Session.start(name, behavior: 'delete')
  end
end

#lockBoolean

Obtain a lock with the current session

Returns:

  • (Boolean)

    ‘true` if the lock was obtained



82
83
84
85
86
# File 'lib/daemon_runner/semaphore.rb', line 82

def lock
  contender_key
  semaphore_state
  try_lock
end

#locked?Boolean

Check if the semaphore holds the lock

Returns:

  • (Boolean)

    ‘true` if the lock is held, `false` otherwise



91
92
93
94
# File 'lib/daemon_runner/semaphore.rb', line 91

def locked?
  semaphore_state
  lock_exists? && (lock_content['Holders'] || []).include?(session.id)
end

#releaseBoolean

Release a lock with the current session

Returns:

  • (Boolean)

    ‘true` if the lock was released



115
116
117
118
# File 'lib/daemon_runner/semaphore.rb', line 115

def release
  semaphore_state
  try_release
end

#renewThread

Renew lock watching for changes

Returns:

  • (Thread)

    Thread running a blocking call maintaining the lock state



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/daemon_runner/semaphore.rb', line 99

def renew
  thr = Thread.new do
    loop do
      if renew?
        semaphore_state
        try_lock
      end
    end
  end
  thr
end

#renew?Boolean

Start a blocking query on the prefix, if there are changes we need to try to obtain the lock again.

Returns:

  • (Boolean)

    ‘true` if there are changes, `false` if the request has timed out



195
196
197
198
199
200
201
202
203
# File 'lib/daemon_runner/semaphore.rb', line 195

def renew?
  logger.debug("Watching Consul #{prefix} for changes")
  options = { recurse: true }
  changes = Diplomat::Kv.get(prefix, options, :wait, :wait)
  logger.info("Changes on #{prefix} detected") if changes
  changes
rescue StandardError => e
  logger.error(e)
end

#semaphore_stateObject

Get the current semaphore state by fetching all conterder keys and the lock key



152
153
154
155
156
157
# File 'lib/daemon_runner/semaphore.rb', line 152

def semaphore_state
  options = { decode_values: true, recurse: true }
  @state = Diplomat::Kv.get(prefix, options, :return)
  decode_semaphore_state unless state.empty?
  state
end

#set_limit(new_limit) ⇒ Object



127
128
129
130
131
132
133
134
135
136
# File 'lib/daemon_runner/semaphore.rb', line 127

def set_limit(new_limit)
  if lock_exists?
    if new_limit.to_i != @limit.to_i
      logger.warn 'Limit in lockfile and @limit do not match using limit from lockfile'
    end
    @limit = lock_content['Limit']
  else
    @limit = new_limit
  end
end

#try_lockObject



159
160
161
162
163
164
165
166
167
168
# File 'lib/daemon_runner/semaphore.rb', line 159

def try_lock
  prune_members
  do_update = add_self_to_holders
  @reset = false
  if do_update
    format_holders
    @locked = write_lock
  end
  log_lock_state
end

#try_releaseObject



170
171
172
173
174
175
176
177
178
179
# File 'lib/daemon_runner/semaphore.rb', line 170

def try_release
  do_update = remove_self_from_holders
  if do_update
    format_holders
    @locked = !write_lock
  end
  DaemonRunner::Session.release(prefix)
  session.destroy!
  log_release_state
end

#write_lockBoolean

Write a new lock file if the number of contenders is less than ‘limit`

Returns:

  • (Boolean)

    ‘true` if the lock was written succesfully



183
184
185
186
187
188
# File 'lib/daemon_runner/semaphore.rb', line 183

def write_lock
  index = lock_modify_index.nil? ? 0 : lock_modify_index
  value = generate_lockfile
  return true if value == true
  Diplomat::Kv.put(@lock, value, cas: index)
end