Class: Sidekiq::LimitFetch::Global::Semaphore

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/limit_fetch/global/semaphore.rb

Constant Summary collapse

PREFIX =
'limit_fetch'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Semaphore

Returns a new instance of Semaphore.



7
8
9
10
11
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 7

def initialize(name)
  @name = name
  @lock = Mutex.new
  @local_busy = 0
end

Instance Attribute Details

#local_busyObject (readonly)

Returns the value of attribute local_busy.



5
6
7
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 5

def local_busy
  @local_busy
end

Instance Method Details

#acquireObject



45
46
47
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 45

def acquire
  Selector.acquire([@name], namespace).size > 0
end

#blockObject



95
96
97
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 95

def block
  redis {|it| it.set "#{PREFIX}:block:#@name", true }
end

#block_except(*queues) ⇒ Object

Raises:

  • (ArgumentError)


99
100
101
102
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 99

def block_except(*queues)
  raise ArgumentError if queues.empty?
  redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') }
end

#blocking?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 108

def blocking?
  redis {|it| it.get "#{PREFIX}:block:#@name" }
end

#busyObject



53
54
55
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 53

def busy
  redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end

#busy_processesObject



57
58
59
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 57

def busy_processes
  redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 }
end

#decrease_busyObject



66
67
68
69
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 66

def decrease_busy
  decrease_local_busy
  redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
end

#decrease_local_busyObject



116
117
118
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 116

def decrease_local_busy
  @lock.synchronize { @local_busy -= 1 }
end

#explainObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 124

def explain
  <<-END.gsub(/^ {8}/, '')
    Current sidekiq process: #{Selector.uuid}

      All processes:
    #{Monitor.all_processes.join "\n"}

      Stale processes:
    #{Monitor.old_processes.join "\n"}

      Locked queue processes:
    #{probed_processes.sort.join "\n"}

      Busy queue processes:
    #{busy_processes.sort.join "\n"}

      Limit:
    #{limit.inspect}

      Process limit:
    #{process_limit.inspect}

      Blocking:
    #{blocking?}
  END
end

#increase_busyObject



61
62
63
64
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 61

def increase_busy
  increase_local_busy
  redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
end

#increase_local_busyObject



112
113
114
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 112

def increase_local_busy
  @lock.synchronize { @local_busy += 1 }
end

#limitObject



13
14
15
16
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 13

def limit
  value = redis {|it| it.get "#{PREFIX}:limit:#@name" }
  value.to_i if value
end

#limit=(value) ⇒ Object



18
19
20
21
22
23
24
25
26
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 18

def limit=(value)
  @limit_changed = true

  if value
    redis {|it| it.set "#{PREFIX}:limit:#@name", value }
  else
    redis {|it| it.del "#{PREFIX}:limit:#@name" }
  end
end

#limit_changed?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 28

def limit_changed?
  @limit_changed
end

#local_busy?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 120

def local_busy?
  @local_busy > 0
end

#pauseObject



79
80
81
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 79

def pause
  redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end

#pause_for_ms(ms) ⇒ Object



83
84
85
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 83

def pause_for_ms ms
  redis {|it| it.psetex "#{PREFIX}:pause:#@name", ms, true }
end

#paused?Boolean

Returns:

  • (Boolean)


91
92
93
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 91

def paused?
  redis {|it| it.get "#{PREFIX}:pause:#@name" }
end

#probedObject



71
72
73
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 71

def probed
  redis {|it| it.llen "#{PREFIX}:probed:#@name" }
end

#probed_processesObject



75
76
77
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 75

def probed_processes
  redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 }
end

#process_limitObject



32
33
34
35
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 32

def process_limit
  value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" }
  value.to_i if value
end

#process_limit=(value) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 37

def process_limit=(value)
  if value
    redis {|it| it.set "#{PREFIX}:process_limit:#@name", value }
  else
    redis {|it| it.del "#{PREFIX}:process_limit:#@name" }
  end
end

#releaseObject



49
50
51
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 49

def release
  redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid }
end

#remove_lock!(process) ⇒ Object



158
159
160
161
162
163
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 158

def remove_lock!(process)
  redis do |it|
    it.lrem "#{PREFIX}:probed:#@name", 0, process
    it.lrem "#{PREFIX}:busy:#@name", 0, process
  end
end

#remove_locks_except!(processes) ⇒ Object



151
152
153
154
155
156
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 151

def remove_locks_except!(processes)
  locked_processes = probed_processes.uniq
  (locked_processes - processes).each do |dead_process|
    remove_lock! dead_process
  end
end

#unblockObject



104
105
106
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 104

def unblock
  redis {|it| it.del "#{PREFIX}:block:#@name" }
end

#unpauseObject



87
88
89
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 87

def unpause
  redis {|it| it.del "#{PREFIX}:pause:#@name" }
end