Class: Sidekiq::LimitFetch::Global::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/limit_fetch/global/semaphore.rb

Constant Summary collapse

PREFIX =
'limit_fetch'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Semaphore

Returns a new instance of Semaphore.



11
12
13
14
15
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 11

def initialize(name)
  @name = name
  @lock = Mutex.new
  @local_busy = 0
end

Instance Attribute Details

#local_busyObject (readonly)

Returns the value of attribute local_busy.



9
10
11
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 9

def local_busy
  @local_busy
end

Instance Method Details

#acquireObject



49
50
51
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 49

def acquire
  Selector.acquire([@name], namespace).size.positive?
end

#blockObject



99
100
101
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 99

def block
  redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' }
end

#block_except(*queues) ⇒ Object

Raises:

  • (ArgumentError)


103
104
105
106
107
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 103

def block_except(*queues)
  raise ArgumentError if queues.empty?

  redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') }
end

#blocking?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 113

def blocking?
  redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1'
end

#busyObject



57
58
59
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 57

def busy
  redis { |it| it.llen "#{PREFIX}:busy:#{@name}" }
end

#busy_processesObject



61
62
63
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 61

def busy_processes
  redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 }
end

#clear_limitsObject



117
118
119
120
121
122
123
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 117

def clear_limits
  redis do |it|
    %w[block busy limit pause probed process_limit].each do |key|
      it.del "#{PREFIX}:#{key}:#{@name}"
    end
  end
end

#decrease_busyObject



70
71
72
73
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 70

def decrease_busy
  decrease_local_busy
  redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid }
end

#decrease_local_busyObject



129
130
131
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 129

def decrease_local_busy
  @lock.synchronize { @local_busy -= 1 }
end

#explainObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 137

def explain
  <<-INFO.gsub(/^ {8}/, '')
Current sidekiq process: #{Selector.uuid}

  All processes:
#{Monitor.all_processes.join "\n"}

  Stale processes:
#{Monitor.old_processes.join "\n"}

  Locked queue processes:
#{probed_processes.sort.join "\n"}

  Busy queue processes:
#{busy_processes.sort.join "\n"}

  Limit:
#{limit.inspect}

  Process limit:
#{process_limit.inspect}

  Blocking:
#{blocking?}
  INFO
end

#increase_busyObject



65
66
67
68
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 65

def increase_busy
  increase_local_busy
  redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid }
end

#increase_local_busyObject



125
126
127
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 125

def increase_local_busy
  @lock.synchronize { @local_busy += 1 }
end

#limitObject



17
18
19
20
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 17

def limit
  value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" }
  value&.to_i
end

#limit=(value) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 22

def limit=(value)
  @limit_changed = true

  if value
    redis { |it| it.set "#{PREFIX}:limit:#{@name}", value }
  else
    redis { |it| it.del "#{PREFIX}:limit:#{@name}" }
  end
end

#limit_changed?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 32

def limit_changed?
  @limit_changed
end

#local_busy?Boolean

Returns:

  • (Boolean)


133
134
135
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 133

def local_busy?
  @local_busy.positive?
end

#pauseObject



83
84
85
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 83

def pause
  redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' }
end

#pause_for_ms(milliseconds) ⇒ Object



87
88
89
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 87

def pause_for_ms(milliseconds)
  redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 }
end

#paused?Boolean

Returns:

  • (Boolean)


95
96
97
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 95

def paused?
  redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1'
end

#probedObject



75
76
77
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 75

def probed
  redis { |it| it.llen "#{PREFIX}:probed:#{@name}" }
end

#probed_processesObject



79
80
81
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 79

def probed_processes
  redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 }
end

#process_limitObject



36
37
38
39
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 36

def process_limit
  value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" }
  value&.to_i
end

#process_limit=(value) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 41

def process_limit=(value)
  if value
    redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value }
  else
    redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" }
  end
end

#releaseObject



53
54
55
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 53

def release
  redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid }
end

#remove_lock!(process) ⇒ Object



171
172
173
174
175
176
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 171

def remove_lock!(process)
  redis do |it|
    it.lrem "#{PREFIX}:probed:#{@name}", 0, process
    it.lrem "#{PREFIX}:busy:#{@name}", 0, process
  end
end

#remove_locks_except!(processes) ⇒ Object



164
165
166
167
168
169
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 164

def remove_locks_except!(processes)
  locked_processes = probed_processes.uniq
  (locked_processes - processes).each do |dead_process|
    remove_lock! dead_process
  end
end

#unblockObject



109
110
111
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 109

def unblock
  redis { |it| it.del "#{PREFIX}:block:#{@name}" }
end

#unpauseObject



91
92
93
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 91

def unpause
  redis { |it| it.del "#{PREFIX}:pause:#{@name}" }
end