Class: Sidekiq::LimitFetch::Global::Semaphore
- Inherits:
-
Object
- Object
- Sidekiq::LimitFetch::Global::Semaphore
- Defined in:
- lib/sidekiq/limit_fetch/global/semaphore.rb
Constant Summary collapse
- PREFIX =
'limit_fetch'
Instance Attribute Summary collapse
-
#local_busy ⇒ Object
readonly
Returns the value of attribute local_busy.
Instance Method Summary collapse
- #acquire ⇒ Object
- #block ⇒ Object
- #block_except(*queues) ⇒ Object
- #blocking? ⇒ Boolean
- #busy ⇒ Object
- #busy_processes ⇒ Object
- #decrease_busy ⇒ Object
- #decrease_local_busy ⇒ Object
- #explain ⇒ Object
- #increase_busy ⇒ Object
- #increase_local_busy ⇒ Object
-
#initialize(name) ⇒ Semaphore
constructor
A new instance of Semaphore.
- #limit ⇒ Object
- #limit=(value) ⇒ Object
- #limit_changed? ⇒ Boolean
- #local_busy? ⇒ Boolean
- #pause ⇒ Object
- #pause_for_ms(ms) ⇒ Object
- #paused? ⇒ Boolean
- #probed ⇒ Object
- #probed_processes ⇒ Object
- #process_limit ⇒ Object
- #process_limit=(value) ⇒ Object
- #release ⇒ Object
- #remove_lock!(process) ⇒ Object
- #remove_locks_except!(processes) ⇒ Object
- #unblock ⇒ Object
- #unpause ⇒ Object
Constructor Details
#initialize(name) ⇒ Semaphore
Returns a new instance of Semaphore.
7 8 9 10 11 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 7 def initialize(name) @name = name @lock = Mutex.new @local_busy = 0 end |
Instance Attribute Details
#local_busy ⇒ Object (readonly)
Returns the value of attribute local_busy.
5 6 7 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 5 def local_busy @local_busy end |
Instance Method Details
#acquire ⇒ Object
45 46 47 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 45 def acquire Selector.acquire([@name], namespace).size > 0 end |
#block ⇒ Object
95 96 97 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 95 def block redis {|it| it.set "#{PREFIX}:block:#@name", true } end |
#block_except(*queues) ⇒ Object
99 100 101 102 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 99 def block_except(*queues) raise ArgumentError if queues.empty? redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') } end |
#blocking? ⇒ Boolean
108 109 110 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 108 def blocking? redis {|it| it.get "#{PREFIX}:block:#@name" } end |
#busy ⇒ Object
53 54 55 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 53 def busy redis {|it| it.llen "#{PREFIX}:busy:#@name" } end |
#busy_processes ⇒ Object
57 58 59 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 57 def busy_processes redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 } end |
#decrease_busy ⇒ Object
66 67 68 69 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 66 def decrease_busy decrease_local_busy redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid } end |
#decrease_local_busy ⇒ Object
116 117 118 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 116 def decrease_local_busy @lock.synchronize { @local_busy -= 1 } end |
#explain ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 124 def explain <<-END.gsub(/^ {8}/, '') Current sidekiq process: #{Selector.uuid} All processes: #{Monitor.all_processes.join "\n"} Stale processes: #{Monitor.old_processes.join "\n"} Locked queue processes: #{probed_processes.sort.join "\n"} Busy queue processes: #{busy_processes.sort.join "\n"} Limit: #{limit.inspect} Process limit: #{process_limit.inspect} Blocking: #{blocking?} END end |
#increase_busy ⇒ Object
61 62 63 64 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 61 def increase_busy increase_local_busy redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid } end |
#increase_local_busy ⇒ Object
112 113 114 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 112 def increase_local_busy @lock.synchronize { @local_busy += 1 } end |
#limit ⇒ Object
13 14 15 16 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 13 def limit value = redis {|it| it.get "#{PREFIX}:limit:#@name" } value.to_i if value end |
#limit=(value) ⇒ Object
18 19 20 21 22 23 24 25 26 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 18 def limit=(value) @limit_changed = true if value redis {|it| it.set "#{PREFIX}:limit:#@name", value } else redis {|it| it.del "#{PREFIX}:limit:#@name" } end end |
#limit_changed? ⇒ Boolean
28 29 30 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 28 def limit_changed? @limit_changed end |
#local_busy? ⇒ Boolean
120 121 122 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 120 def local_busy? @local_busy > 0 end |
#pause ⇒ Object
79 80 81 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 79 def pause redis {|it| it.set "#{PREFIX}:pause:#@name", true } end |
#pause_for_ms(ms) ⇒ Object
83 84 85 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 83 def pause_for_ms ms redis {|it| it.psetex "#{PREFIX}:pause:#@name", ms, true } end |
#paused? ⇒ Boolean
91 92 93 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 91 def paused? redis {|it| it.get "#{PREFIX}:pause:#@name" } end |
#probed ⇒ Object
71 72 73 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 71 def probed redis {|it| it.llen "#{PREFIX}:probed:#@name" } end |
#probed_processes ⇒ Object
75 76 77 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 75 def probed_processes redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 } end |
#process_limit ⇒ Object
32 33 34 35 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 32 def process_limit value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" } value.to_i if value end |
#process_limit=(value) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 37 def process_limit=(value) if value redis {|it| it.set "#{PREFIX}:process_limit:#@name", value } else redis {|it| it.del "#{PREFIX}:process_limit:#@name" } end end |
#release ⇒ Object
49 50 51 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 49 def release redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid } end |
#remove_lock!(process) ⇒ Object
158 159 160 161 162 163 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 158 def remove_lock!(process) redis do |it| it.lrem "#{PREFIX}:probed:#@name", 0, process it.lrem "#{PREFIX}:busy:#@name", 0, process end end |
#remove_locks_except!(processes) ⇒ Object
151 152 153 154 155 156 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 151 def remove_locks_except!(processes) locked_processes = probed_processes.uniq (locked_processes - processes).each do |dead_process| remove_lock! dead_process end end |
#unblock ⇒ Object
104 105 106 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 104 def unblock redis {|it| it.del "#{PREFIX}:block:#@name" } end |
#unpause ⇒ Object
87 88 89 |
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 87 def unpause redis {|it| it.del "#{PREFIX}:pause:#@name" } end |