Class: Sidekiq::LimitFetch::Global::Semaphore

Inherits:
Object
  • Object
show all
Includes:
Redis
Defined in:
lib/sidekiq/limit_fetch/global/semaphore.rb

Constant Summary collapse

PREFIX =
'limit_fetch'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Redis

#determine_namespace, #nonblocking_redis, #redis

Constructor Details

#initialize(name) ⇒ Semaphore

Returns a new instance of Semaphore.



9
10
11
12
13
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 9

def initialize(name)
  @name = name
  @lock = Mutex.new
  @local_busy = 0
end

Instance Attribute Details

#local_busyObject (readonly)

Returns the value of attribute local_busy.



7
8
9
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 7

def local_busy
  @local_busy
end

Instance Method Details

#acquireObject



41
42
43
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 41

def acquire
  Selector.acquire([@name], determine_namespace).size > 0
end

#blockObject



87
88
89
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 87

def block
  redis {|it| it.set "#{PREFIX}:block:#@name", true }
end

#block_except(*queues) ⇒ Object

Raises:

  • (ArgumentError)


91
92
93
94
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 91

def block_except(*queues)
  raise ArgumentError if queues.empty?
  redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') }
end

#blocking?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 100

def blocking?
  redis {|it| it.get "#{PREFIX}:block:#@name" }
end

#busyObject



49
50
51
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 49

def busy
  redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end

#busy_processesObject



53
54
55
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 53

def busy_processes
  redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 }
end

#decrease_busyObject



62
63
64
65
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 62

def decrease_busy
  decrease_local_busy
  redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
end

#decrease_local_busyObject



108
109
110
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 108

def decrease_local_busy
  @lock.synchronize { @local_busy -= 1 }
end

#explainObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 116

def explain
  <<-END.gsub(/^ {8}/, '')
    Current sidekiq process: #{Selector.uuid}

      All processes:
    #{Monitor.all_processes.join "\n"}

      Stale processes:
    #{Monitor.old_processes.join "\n"}

      Locked queue processes:
    #{probed_processes.sort.join "\n"}

      Busy queue processes:
    #{busy_processes.sort.join "\n"}

      Limit:
    #{limit.inspect}

      Process limit:
    #{process_limit.inspect}

      Blocking:
    #{blocking?}
  END
end

#increase_busyObject



57
58
59
60
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 57

def increase_busy
  increase_local_busy
  redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
end

#increase_local_busyObject



104
105
106
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 104

def increase_local_busy
  @lock.synchronize { @local_busy += 1 }
end

#limitObject



15
16
17
18
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 15

def limit
  value = redis {|it| it.get "#{PREFIX}:limit:#@name" }
  value.to_i if value
end

#limit=(value) ⇒ Object



20
21
22
23
24
25
26
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 20

def limit=(value)
  if value
    redis {|it| it.set "#{PREFIX}:limit:#@name", value }
  else
    redis {|it| it.del "#{PREFIX}:limit:#@name" }
  end
end

#local_busy?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 112

def local_busy?
  @local_busy > 0
end

#pauseObject



75
76
77
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 75

def pause
  redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end

#paused?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 83

def paused?
  redis {|it| it.get "#{PREFIX}:pause:#@name" }
end

#probedObject



67
68
69
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 67

def probed
  redis {|it| it.llen "#{PREFIX}:probed:#@name" }
end

#probed_processesObject



71
72
73
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 71

def probed_processes
  redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 }
end

#process_limitObject



28
29
30
31
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 28

def process_limit
  value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" }
  value.to_i if value
end

#process_limit=(value) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 33

def process_limit=(value)
  if value
    redis {|it| it.set "#{PREFIX}:process_limit:#@name", value }
  else
    redis {|it| it.del "#{PREFIX}:process_limit:#@name" }
  end
end

#releaseObject



45
46
47
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 45

def release
  redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid }
end

#remove_lock!(process) ⇒ Object



150
151
152
153
154
155
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 150

def remove_lock!(process)
  redis do |it|
    it.lrem "#{PREFIX}:probed:#@name", 0, process
    it.lrem "#{PREFIX}:busy:#@name", 0, process
  end
end

#remove_locks_except!(processes) ⇒ Object



143
144
145
146
147
148
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 143

def remove_locks_except!(processes)
  locked_processes = probed_processes.uniq
  (locked_processes - processes).each do |dead_process|
    remove_lock! dead_process
  end
end

#unblockObject



96
97
98
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 96

def unblock
  redis {|it| it.del "#{PREFIX}:block:#@name" }
end

#unpauseObject



79
80
81
# File 'lib/sidekiq/limit_fetch/global/semaphore.rb', line 79

def unpause
  redis {|it| it.del "#{PREFIX}:pause:#@name" }
end