Class: Sidekiq::Qlimit::QlimitFetch

Inherits:
BasicFetch
  • Object
show all
Defined in:
lib/sidekiq/qlimit/qlimit_fetch.rb

Overview

Throttled version of ‘Sidekiq::QlimitFetch` fetcher strategy.

Just add somewhere in your bootstrap:

require "sidekiq/qlimit"
Sidekiq::Qlimit.setup!

Establish max # of total workers per queue

sidekiq.yml –

:qlimit:
  queue_name_1: 2
  queue_name_2: 4

– TODO: Store current limits in redis and read from redis to display ++

Defined Under Namespace

Classes: UnitOfWork

Constant Summary collapse

@@qlimit_increment_sha =

Redis Script SHA tracking

""
@@qlimit_decrement_sha =
""

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ QlimitFetch

Modified Initialize Function - Reads :qlimit from config source such as sidekiq.yml



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 56

def initialize(options)
  super(options)
  
  # puts options
  # {:queues=>["unlimited", "limited"], :labels=>[], :concurrency=>5, :require=>".", :environment=>nil, :timeout=>8, :poll_interval_average=>nil, :average_scheduled_poll_interval=>15, :error_handlers=>[#<Sidekiq::ExceptionHandler::Logger:0x007fda0c30fe38>], :lifecycle_events=>{:startup=>[], :quiet=>[], :shutdown=>[]}, :dead_max_jobs=>10000, :dead_timeout_in_seconds=>15552000, :pidfile=>"tmp/pids/sidekiq.pid", :qlimit=>[{"limited"=>2}, {"fake"=>3}], :config_file=>"config/sidekiq.yml", :strict=>true, :fetch=>Sidekiq::Qlimit::QlimitFetch, :tag=>"example"}


  # Get our limits
  @per_queue_limits = {}
  unless options[:qlimit].nil?
    options[:qlimit].each do |limit_hash|
      limit_hash.each do |k, v|
        @per_queue_limits[k] = v
      end
    end
  end

  # TODO: Store current limits in redis and read from redis to display

  QlimitFetch.qlimit_script_load
end

Class Method Details

.bulk_requeue(inprogress, options) ⇒ Object

Used to requeue jobs on sidekiq shutdown/termination



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 254

def self.bulk_requeue(inprogress, options)
  return if inprogress.empty?

  Sidekiq.logger.debug { "Re-queueing terminated jobs" }
  jobs_to_requeue = {}
  inprogress.each do |unit_of_work|
    jobs_to_requeue[unit_of_work.queue_name] ||= []
    jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job
  end

  Sidekiq.redis do |conn|
    conn.pipelined do
      jobs_to_requeue.each do |queue, jobs|
        conn.rpush("queue:#{queue}", jobs)
        # Reduce qlimit on requeue of amount: jobs.length
        self.qlimit_decrement(queue, jobs.length)
      end
    end
  end
  Sidekiq.logger.debug("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
  Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

.qlimit_decrement(queue, amount = 1) ⇒ Object

Decrement current count of running jobs in queue by amount (default: 1)



218
219
220
221
222
223
224
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 218

def self.qlimit_decrement(queue, amount = 1)
  Sidekiq.logger.debug("Qlimit Decrement: #{queue} by #{amount}")

  Sidekiq.redis do |conn|
    result = conn.evalsha(@@qlimit_decrement_sha,["qlimit:#{queue}"], [amount])
  end
end

.qlimit_get(queue) ⇒ Object

Get current count of running jobs from queue



244
245
246
247
248
249
250
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 244

def self.qlimit_get(queue)
  Sidekiq.redis do |conn|
    result = conn.get("qlimit:#{queue}")
    return result.to_i unless result.nil?
    return 0
  end
end

.qlimit_hashObject

Returns a hash of current count of running jobs in queues

Example: { "queue1": 123, "queue2": 456 }


188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 188

def self.qlimit_hash
  qlimits = {}
  Sidekiq.redis do |conn|
    conn.keys("qlimit:*").each do |key|
      qkey = key.sub(/.*qlimit:/, ''.freeze)
      qvalue = conn.get(key)
      qvalue ||= 0
      qlimits[qkey] ||= qvalue.to_i
    end
  end
  qlimits
end

.qlimit_increment(queue, limit) ⇒ Object

Increment current count of running jobs in queue by amount NOT to exceed limit

return 1 if incrementing count would NOT exceed limit return 0 if incrementing count would exceed limit



206
207
208
209
210
211
212
213
214
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 206

def self.qlimit_increment(queue, limit)
  return 1 if limit.nil?  # No limit, no processing

  Sidekiq.redis do |conn|
    result = conn.evalsha(@@qlimit_increment_sha,["qlimit:#{queue}"],[limit])
    #Sidekiq.logger.debug("Checking Qlimit #{queue} => #{result}")
    return result
  end
end

.qlimit_reset(queue) ⇒ Object

Set current count of running jobs in queue to 0



228
229
230
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 228

def self.qlimit_reset(queue)
    self.qlimit_set(queue, 0)
end

.qlimit_script_loadObject

Returns a “UnitOfWork” from a qualifying queue if available



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 143

def self.qlimit_script_load
  # Note:
  # This is not theadsafe.  Instead we *blindly* increment if current < max. 
  # We assume there is little or no penalty for running a few too many workers
  qlimit_increment_script = <<-EOF
      local max = tonumber(ARGV[1])
      local current = tonumber(redis.call('get',KEYS[1]))
      local current_i = 0
      if nil == current then
        current_i = 0
      else
        current_i = tonumber(current)
      end

      if current_i < max then
          redis.call('incr',KEYS[1])
          redis.call('expire',KEYS[1], 14400)
          return true
      else
          return false
      end
  EOF

  # Note: 
  # If we zero a counter or reduce a limit, we could go "negative" on a decrement.  
  # Limit minimum at 0 which is self correcting.
  qlimit_decrement_script = <<-EOF
      redis.call('decrby', KEYS[1], ARGV[1])
      local current = tonumber(redis.call('get',KEYS[1]))
      if current < 0 then
          redis.call('set',KEYS[1], 0)
      end
      redis.call('expire',KEYS[1], 14400)
  EOF

  Sidekiq.redis do |conn|
    @@qlimit_increment_sha = conn.script(:load, qlimit_increment_script)
    @@qlimit_decrement_sha = conn.script(:load, qlimit_decrement_script)
  end
end

.qlimit_set(queue, amount = 0) ⇒ Object

Set current count of running jobs in queue



234
235
236
237
238
239
240
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 234

def self.qlimit_set(queue, amount = 0)
  Sidekiq.logger.debug("Qlimit Set: #{queue} => #{amount}")
  Sidekiq.redis do |conn|
    conn.set("qlimit:#{queue}", amount)
    conn.expire("qlimit:#{queue}", 14400)
  end
end

Instance Method Details

#qualifying_queuesObject

Returns an array of queue names that are NOT too busy



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 84

def qualifying_queues
  # Working copy of @queues list
  allowed_queues = @queues.dup
  
  # Remove any queue which has hit the maximum number of concurrent jobs 
  # NOTE: Could remove a queue for which a job has just finished, but we'll catch that job on the next loop
  @per_queue_limits.each do |k, v|
        Sidekiq.logger.debug("Checking #{k} => #{v}")
        Sidekiq.redis do |conn|
          jobs_in_queue = QlimitFetch.qlimit_get(k)
          if jobs_in_queue.to_i >= v # detected a maximum concurrent case
            allowed_queues.delete("queue:#{k}")
            Sidekiq.logger.debug("Remove: queue:#{k}")
          end
        end
          
  end

  Sidekiq.logger.debug("Allowed Queues: #{allowed_queues}")

  # Follow original sidekiq fetch strategy
  if @strictly_ordered_queues
    allowed_queues
  else
    allowed_queues = allowed_queues.shuffle.uniq
    allowed_queues << TIMEOUT # brpop should Wait X number of seconds before returning nil
    allowed_queues
  end
end

#retrieve_workObject

Returns a “UnitOfWork” from a qualifying queue if available



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/sidekiq/qlimit/qlimit_fetch.rb', line 117

def retrieve_work 
  work_text = Sidekiq.redis { |conn| conn.brpop(*qualifying_queues) }
  work = UnitOfWork.new(*work_text) if work_text

  if work.nil?
    Sidekiq.logger.debug("No Work")
    return
  end
  
  # We hack around the simultaneous zero starting problem by incrementing an expiring counter 
  okay_to_continue = QlimitFetch.qlimit_increment(work.queue_name, @per_queue_limits[work.queue_name])
  Sidekiq.logger.debug("QlimitIncrement Result => #{okay_to_continue}")

  if okay_to_continue
    Sidekiq.logger.debug("Perform #{work}")
    return work
  else
    Sidekiq.logger.debug("Requeue #{work}")
    Sidekiq.redis { |conn| conn.lpush(work.queue, work.job) }
    return nil
  end
end