Class: Sidekiq::LimitFetch::Global::Semaphore
- Inherits:
-
Object
- Object
- Sidekiq::LimitFetch::Global::Semaphore
show all
- Includes:
- Redis
- Defined in:
- lib/sidekiq/limit_fetch/global/semaphore.rb
Constant Summary
collapse
- PREFIX =
'limit_fetch'
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Redis
#determine_namespace, #nonblocking_redis, #redis
Constructor Details
#initialize(name) ⇒ Semaphore
Returns a new instance of Semaphore.
9
10
11
12
13
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 9
def initialize(name)
@name = name
@lock = Mutex.new
@local_busy = 0
end
|
Instance Attribute Details
#local_busy ⇒ Object
Returns the value of attribute local_busy.
7
8
9
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 7
def local_busy
@local_busy
end
|
Instance Method Details
#acquire ⇒ Object
41
42
43
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 41
def acquire
Selector.acquire([@name], determine_namespace).size > 0
end
|
#block ⇒ Object
87
88
89
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 87
def block
redis {|it| it.set "#{PREFIX}:block:#@name", true }
end
|
#block_except(*queues) ⇒ Object
91
92
93
94
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 91
def block_except(*queues)
raise ArgumentError if queues.empty?
redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') }
end
|
#blocking? ⇒ Boolean
100
101
102
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 100
def blocking?
redis {|it| it.get "#{PREFIX}:block:#@name" }
end
|
#busy ⇒ Object
49
50
51
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 49
def busy
redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end
|
#busy_processes ⇒ Object
53
54
55
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 53
def busy_processes
redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 }
end
|
#decrease_busy ⇒ Object
62
63
64
65
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 62
def decrease_busy
decrease_local_busy
redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
end
|
#decrease_local_busy ⇒ Object
108
109
110
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 108
def decrease_local_busy
@lock.synchronize { @local_busy -= 1 }
end
|
#explain ⇒ Object
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 116
def explain
<<-END.gsub(/^ {8}/, '')
Current sidekiq process: #{Selector.uuid}
All processes:
#{Monitor.all_processes.join "\n"}
Stale processes:
#{Monitor.old_processes.join "\n"}
Locked queue processes:
#{probed_processes.sort.join "\n"}
Busy queue processes:
#{busy_processes.sort.join "\n"}
Limit:
#{limit.inspect}
Process limit:
#{process_limit.inspect}
Blocking:
#{blocking?}
END
end
|
#increase_busy ⇒ Object
57
58
59
60
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 57
def increase_busy
increase_local_busy
redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
end
|
#increase_local_busy ⇒ Object
104
105
106
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 104
def increase_local_busy
@lock.synchronize { @local_busy += 1 }
end
|
#limit ⇒ Object
15
16
17
18
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 15
def limit
value = redis {|it| it.get "#{PREFIX}:limit:#@name" }
value.to_i if value
end
|
#limit=(value) ⇒ Object
20
21
22
23
24
25
26
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 20
def limit=(value)
if value
redis {|it| it.set "#{PREFIX}:limit:#@name", value }
else
redis {|it| it.del "#{PREFIX}:limit:#@name" }
end
end
|
#local_busy? ⇒ Boolean
112
113
114
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 112
def local_busy?
@local_busy > 0
end
|
#pause ⇒ Object
75
76
77
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 75
def pause
redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end
|
#paused? ⇒ Boolean
83
84
85
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 83
def paused?
redis {|it| it.get "#{PREFIX}:pause:#@name" }
end
|
#probed ⇒ Object
67
68
69
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 67
def probed
redis {|it| it.llen "#{PREFIX}:probed:#@name" }
end
|
#probed_processes ⇒ Object
71
72
73
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 71
def probed_processes
redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 }
end
|
#process_limit ⇒ Object
28
29
30
31
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 28
def process_limit
value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" }
value.to_i if value
end
|
#process_limit=(value) ⇒ Object
33
34
35
36
37
38
39
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 33
def process_limit=(value)
if value
redis {|it| it.set "#{PREFIX}:process_limit:#@name", value }
else
redis {|it| it.del "#{PREFIX}:process_limit:#@name" }
end
end
|
#release ⇒ Object
45
46
47
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 45
def release
redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid }
end
|
#remove_lock!(process) ⇒ Object
150
151
152
153
154
155
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 150
def remove_lock!(process)
redis do |it|
it.lrem "#{PREFIX}:probed:#@name", 0, process
it.lrem "#{PREFIX}:busy:#@name", 0, process
end
end
|
#remove_locks_except!(processes) ⇒ Object
143
144
145
146
147
148
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 143
def remove_locks_except!(processes)
locked_processes = probed_processes.uniq
(locked_processes - processes).each do |dead_process|
remove_lock! dead_process
end
end
|
#unblock ⇒ Object
96
97
98
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 96
def unblock
redis {|it| it.del "#{PREFIX}:block:#@name" }
end
|
#unpause ⇒ Object
79
80
81
|
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 79
def unpause
redis {|it| it.del "#{PREFIX}:pause:#@name" }
end
|