Class: Officer::LockStore

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/officer/lock_store.rb

Instance Method Summary collapse

Constructor Details

#initializeLockStore

Returns a new instance of LockStore.



22
23
24
25
26
# File 'lib/officer/lock_store.rb', line 22

def initialize
  @locks = {} # name => Lock
  @connections = {} # Connection => Set(name, ...)
  @acquire_counter = 0
end

Instance Method Details

#acquire(name, connection, options = {}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/officer/lock_store.rb', line 54

def acquire name, connection, options={}
  if options[:queue_max]
    lock = @locks[name]

    if lock && !lock.queue.include?(connection) && lock.queue.length >= options[:queue_max]
      connection.queue_maxed name, :queue => lock.queue.to_host_a
      return
    end
  end

  @acquire_counter += 1

  lock = @locks[name] ||= Lock.new(name)

  if lock.queue.include? connection
    lock.queue.first == connection ? connection.already_acquired(name) : connection.queued(name, options)

  else
    lock.queue << connection
    (@connections[connection] ||= Set.new) << name

    lock.queue.count == 1 ? connection.acquired(name) : connection.queued(name, options)
  end
end

#close_idle_connections(max_idle) ⇒ Object



164
165
166
167
168
169
170
171
# File 'lib/officer/lock_store.rb', line 164

def close_idle_connections(max_idle)
  @connections.each do |conn, names|
    if conn.last_cmd_at < Time.now.utc - max_idle
      Officer::Log.error "Closing due to max idle time: #{conn.to_host_s}"
      conn.close_connection
    end
  end
end

#connections(connection) ⇒ Object



149
150
151
152
153
154
155
156
157
# File 'lib/officer/lock_store.rb', line 149

def connections connection
  connections = {}

  @connections.each do |conn, names|
    connections[conn.to_host_s] = names.to_a
  end

  connection.connections connections
end

#locks(connection) ⇒ Object



139
140
141
142
143
144
145
146
147
# File 'lib/officer/lock_store.rb', line 139

def locks connection
  locks = {}

  @locks.each do |name, lock|
    locks[name] = lock.queue.to_host_a
  end

  connection.locks locks
end

#log_stateObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/officer/lock_store.rb', line 28

def log_state
  l = Officer::Log

  l.info '-----'

  l.info 'LOCK STORE:'
  l.info ''

  l.info "locks:"
  @locks.each do |name, lock|
    l.info "#{name}: connections=[#{lock.queue.to_host_a.join(', ')}]"
  end
  l.info ''

  l.info "Connections:"
  @connections.each do |connection, names|
    l.info "#{connection.to_host_s}: names=[#{names.to_a.join(', ')}]"
  end
  l.info ''

  l.info "Acquire Rate: #{@acquire_counter.to_f / 5}/s"
  @acquire_counter = 0

  l.info '-----'
end

#my_locks(connection) ⇒ Object



159
160
161
162
# File 'lib/officer/lock_store.rb', line 159

def my_locks connection
  my_locks = @connections[connection] ? @connections[connection].to_a : []
  connection.my_locks my_locks
end

#release(name, connection, options = {}) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/officer/lock_store.rb', line 79

def release name, connection, options={}
  if options[:callback].nil?
    options[:callback] = true
  end

  lock = @locks[name]
  names = @connections[connection]

  # Client should only be able to release a lock that
  # exists and that it has previously queued.
  if lock.nil? || !names.include?(name)
    connection.release_failed(name) if options[:callback]
    return
  end

  # If connecton has the lock, release it and let the next
  # connection know that it has acquired the lock.
  if lock.queue.first == connection
    lock.queue.shift
    connection.released name if options[:callback]

    if next_connection = lock.queue.first
      next_connection.acquired name
    else
      @locks.delete name
    end

  # If the connection is queued and doesn't have the lock,
  # dequeue it and leave the other connections alone.
  else
    lock.queue.delete connection
    connection.released name
  end

  names.delete name
end

#reset(connection) ⇒ Object



116
117
118
119
120
121
122
123
124
125
# File 'lib/officer/lock_store.rb', line 116

def reset connection
  names = @connections[connection] || Set.new

  names.each do |name|
    release name, connection, :callback => false
  end

  @connections.delete connection
  connection.reset_succeeded
end

#timeout(name, connection) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/officer/lock_store.rb', line 127

def timeout name, connection
  lock = @locks[name]
  names = @connections[connection]

  return if lock.queue.first == connection # Don't timeout- already have the lock.

  lock.queue.delete connection
  names.delete name

  connection.timed_out name, :queue => lock.queue.to_host_a
end