Class: Sidekiq::LimitFetch::Global::Semaphore
- Inherits:
-
Object
- Object
- Sidekiq::LimitFetch::Global::Semaphore
- Defined in:
- lib/sidekiq/limit_fetch/global/semaphore.rb
Constant Summary collapse
- PREFIX =
'limit_fetch'
Instance Attribute Summary collapse
-
#local_busy ⇒ Object
readonly
Returns the value of attribute local_busy.
Instance Method Summary collapse
- #acquire ⇒ Object
- #block ⇒ Object
- #block_except(*queues) ⇒ Object
- #blocking? ⇒ Boolean
- #busy ⇒ Object
- #busy_processes ⇒ Object
- #clear_limits ⇒ Object
- #decrease_busy ⇒ Object
- #decrease_local_busy ⇒ Object
- #explain ⇒ Object
- #increase_busy ⇒ Object
- #increase_local_busy ⇒ Object
-
#initialize(name) ⇒ Semaphore
constructor
A new instance of Semaphore.
- #limit ⇒ Object
- #limit=(value) ⇒ Object
- #limit_changed? ⇒ Boolean
- #local_busy? ⇒ Boolean
- #pause ⇒ Object
- #pause_for_ms(milliseconds) ⇒ Object
- #paused? ⇒ Boolean
- #probed ⇒ Object
- #probed_processes ⇒ Object
- #process_limit ⇒ Object
- #process_limit=(value) ⇒ Object
- #release ⇒ Object
- #remove_lock!(process) ⇒ Object
- #remove_locks_except!(processes) ⇒ Object
- #unblock ⇒ Object
- #unpause ⇒ Object
Constructor Details
#initialize(name) ⇒ Semaphore
Returns a new instance of Semaphore.
11 12 13 14 15 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 11 def initialize(name) @name = name @lock = Mutex.new @local_busy = 0 end |
Instance Attribute Details
#local_busy ⇒ Object (readonly)
Returns the value of attribute local_busy.
9 10 11 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 9 def local_busy @local_busy end |
Instance Method Details
#acquire ⇒ Object
49 50 51 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 49 def acquire Selector.acquire([@name], namespace).size.positive? end |
#block ⇒ Object
99 100 101 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 99 def block redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' } end |
#block_except(*queues) ⇒ Object
103 104 105 106 107 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 103 def block_except(*queues) raise ArgumentError if queues.empty? redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') } end |
#blocking? ⇒ Boolean
113 114 115 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 113 def blocking? redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1' end |
#busy ⇒ Object
57 58 59 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 57 def busy redis { |it| it.llen "#{PREFIX}:busy:#{@name}" } end |
#busy_processes ⇒ Object
61 62 63 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 61 def busy_processes redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 } end |
#clear_limits ⇒ Object
117 118 119 120 121 122 123 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 117 def clear_limits redis do |it| %w[block busy limit pause probed process_limit].each do |key| it.del "#{PREFIX}:#{key}:#{@name}" end end end |
#decrease_busy ⇒ Object
70 71 72 73 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 70 def decrease_busy decrease_local_busy redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid } end |
#decrease_local_busy ⇒ Object
129 130 131 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 129 def decrease_local_busy @lock.synchronize { @local_busy -= 1 } end |
#explain ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 137 def explain <<-INFO.gsub(/^ {8}/, '') Current sidekiq process: #{Selector.uuid} All processes: #{Monitor.all_processes.join "\n"} Stale processes: #{Monitor.old_processes.join "\n"} Locked queue processes: #{probed_processes.sort.join "\n"} Busy queue processes: #{busy_processes.sort.join "\n"} Limit: #{limit.inspect} Process limit: #{process_limit.inspect} Blocking: #{blocking?} INFO end |
#increase_busy ⇒ Object
65 66 67 68 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 65 def increase_busy increase_local_busy redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid } end |
#increase_local_busy ⇒ Object
125 126 127 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 125 def increase_local_busy @lock.synchronize { @local_busy += 1 } end |
#limit ⇒ Object
17 18 19 20 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 17 def limit value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" } value&.to_i end |
#limit=(value) ⇒ Object
22 23 24 25 26 27 28 29 30 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 22 def limit=(value) @limit_changed = true if value redis { |it| it.set "#{PREFIX}:limit:#{@name}", value } else redis { |it| it.del "#{PREFIX}:limit:#{@name}" } end end |
#limit_changed? ⇒ Boolean
32 33 34 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 32 def limit_changed? @limit_changed end |
#local_busy? ⇒ Boolean
133 134 135 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 133 def local_busy? @local_busy.positive? end |
#pause ⇒ Object
83 84 85 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 83 def pause redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' } end |
#pause_for_ms(milliseconds) ⇒ Object
87 88 89 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 87 def pause_for_ms(milliseconds) redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 } end |
#paused? ⇒ Boolean
95 96 97 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 95 def paused? redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1' end |
#probed ⇒ Object
75 76 77 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 75 def probed redis { |it| it.llen "#{PREFIX}:probed:#{@name}" } end |
#probed_processes ⇒ Object
79 80 81 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 79 def probed_processes redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 } end |
#process_limit ⇒ Object
36 37 38 39 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 36 def process_limit value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" } value&.to_i end |
#process_limit=(value) ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 41 def process_limit=(value) if value redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value } else redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" } end end |
#release ⇒ Object
53 54 55 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 53 def release redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid } end |
#remove_lock!(process) ⇒ Object
171 172 173 174 175 176 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 171 def remove_lock!(process) redis do |it| it.lrem "#{PREFIX}:probed:#{@name}", 0, process it.lrem "#{PREFIX}:busy:#{@name}", 0, process end end |
#remove_locks_except!(processes) ⇒ Object
164 165 166 167 168 169 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 164 def remove_locks_except!(processes) locked_processes = probed_processes.uniq (locked_processes - processes).each do |dead_process| remove_lock! dead_process end end |
#unblock ⇒ Object
109 110 111 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 109 def unblock redis { |it| it.del "#{PREFIX}:block:#{@name}" } end |
#unpause ⇒ Object
91 92 93 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 91 def unpause redis { |it| it.del "#{PREFIX}:pause:#{@name}" } end |