Module: Resque::Plugins::ConcurrentRestriction

Included in:
ConcurrentRestrictionJob
Defined in:
lib/resque/plugins/concurrent_restriction/version.rb,
lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb,
lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb

Defined Under Namespace

Modules: Job, Worker

Constant Summary collapse

VERSION =
"0.5.9"

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.lock_timeoutObject

optional



25
26
27
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25

def lock_timeout
  @lock_timeout
end

.lock_triesObject

optional



25
26
27
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25

def lock_tries
  @lock_tries
end

.reserve_queued_job_attemptsObject

optional



25
26
27
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25

def reserve_queued_job_attempts
  @reserve_queued_job_attempts
end

.restricted_before_queuedObject

optional



25
26
27
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25

def restricted_before_queued
  @restricted_before_queued
end

Class Method Details

.configure {|_self| ... } ⇒ Object

Allows configuring via class accessors

Yields:

  • (_self)

Yield Parameters:



35
36
37
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 35

def self.configure
  yield self
end

Instance Method Details

#acquire_lock(lock_key, lock_expiration) ⇒ Object

Acquires a lock using the given key and lock expiration time



319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 319

def acquire_lock(lock_key, lock_expiration)
  # acquire the lock to work on the restriction queue
  expiration_time = lock_expiration + 1
  acquired_lock = Resque.redis.setnx(lock_key, expiration_time)

  # If we didn't acquire the lock, check the expiration as described
  # at http://redis.io/commands/setnx
  if ! acquired_lock
    # If expiration time is in the future, then someone else beat us to getting the lock
    old_expiration_time = Resque.redis.get(lock_key)
    return false if old_expiration_time.to_i > Time.now.to_i

    # if expiration time was in the future when we set it, then someone beat us to it
    old_expiration_time = Resque.redis.getset(lock_key, expiration_time)
    return false if old_expiration_time.to_i > Time.now.to_i
  end

  # expire the lock eventually so we clean up keys - not needed to timeout
  # lock, just to keep redis clean for locks that aren't being used'
  Resque.redis.expireat(lock_key, expiration_time + 300)

  return true
end

#clear_runnable(tracking_key, queue) ⇒ Object



313
314
315
316
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 313

def clear_runnable(tracking_key, queue)
  Resque.redis.srem(runnables_key(queue), tracking_key)
  Resque.redis.srem(runnables_key, tracking_key)
end

#concurrent(limit) ⇒ Object

Used by the user in their job class to set the concurrency limit



77
78
79
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 77

def concurrent(limit)
  @concurrent = limit
end

#concurrent_identifier(*args) ⇒ Object

Allows the user to specify the unique key that identifies a set of jobs that share a concurrency limit. Defaults to the job class name



83
84
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 83

def concurrent_identifier(*args)
end

#concurrent_limitObject

Used to query what the limit the user has set



87
88
89
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 87

def concurrent_limit
  @concurrent ||= 1
end

#decode(str) ⇒ Object

Decodes the job from the restriction queue



149
150
151
152
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 149

def decode(str)
  item = Resque.decode(str)
  Resque::Job.new(item['queue'], item['payload']) if item
end

#decrement_queue_count(queue, by = 1) ⇒ Object



264
265
266
267
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 264

def decrement_queue_count(queue, by=1)
  value = Resque.redis.hincrby(queue_count_key, queue, -by)
  return value
end

#decrement_running_count(tracking_key) ⇒ Object



250
251
252
253
254
255
256
257
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 250

def decrement_running_count(tracking_key)
  count_key = running_count_key(tracking_key)
  value = Resque.redis.decr(count_key)
  Resque.redis.set(count_key, 0) if value < 0
  restricted = (value >= concurrent_limit)
  mark_runnable(tracking_key, !restricted)
  return restricted
end

#encode(job) ⇒ Object

Encodes the job into the restriction queue



143
144
145
146
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 143

def encode(job)
  item = {:queue => job.queue, :payload => job.payload}
  Resque.encode(item)
end

#get_next_runnable(queue) ⇒ Object



283
284
285
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 283

def get_next_runnable(queue)
  Resque.redis.srandmember(runnables_key(queue))
end

#increment_queue_count(queue, by = 1) ⇒ Object



259
260
261
262
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 259

def increment_queue_count(queue, by=1)
  value = Resque.redis.hincrby(queue_count_key, queue, by)
  return value
end

#increment_running_count(tracking_key) ⇒ Object

The value in redis is the number of jobs currently running If we increment past that, we are restricted. Incrementing is only done after the job is cleared for execution due to checking the runnable state, and post increment we setup runnable for future jobs based on the new “restricted” value



242
243
244
245
246
247
248
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 242

def increment_running_count(tracking_key)
  count_key = running_count_key(tracking_key)
  value = Resque.redis.incr(count_key)
  restricted = (value >= concurrent_limit)
  mark_runnable(tracking_key, !restricted)
  return restricted
end

#lock_key(tracking_key) ⇒ Object

The key used to acquire a lock so we can operate on multiple redis structures (runnables set, running_count) atomically



93
94
95
96
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 93

def lock_key(tracking_key)
  parts = tracking_key.split(".")
  "concurrent.lock.#{parts[2..-1].join('.')}"
end

#mark_runnable(tracking_key, runnable) ⇒ Object

Keeps track of which jobs are currently runnable, that is the tracking_key should have jobs on some restriction queue and also have less than concurrency_limit jobs running



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 296

def mark_runnable(tracking_key, runnable)
  queues = queues_available(tracking_key)
  queues.each do |queue|
    runnable_queues_key = runnables_key(queue)
    if runnable
      Resque.redis.sadd(runnable_queues_key, tracking_key)
    else
      Resque.redis.srem(runnable_queues_key, tracking_key)
    end
  end
  if runnable
    Resque.redis.sadd(runnables_key, tracking_key) if queues.size > 0
  else
    Resque.redis.srem(runnables_key, tracking_key)
  end
end

#next_runnable_job(queue) ⇒ Object

Returns the next job that is runnable



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 409

def next_runnable_job(queue)
  tracking_key = get_next_runnable(queue)
  return nil unless tracking_key

  job = nil
  lock_key = lock_key(tracking_key)

  run_atomically(lock_key) do
    
    # since we don't have a lock when we get the runnable,
    # we need to check it again
    still_runnable = runnable?(tracking_key, queue)
    if still_runnable
      klazz = tracking_class(tracking_key)
      job = klazz.pop_from_restriction_queue(tracking_key, queue)
    end

  end

  return job
    
end

#pop_from_restriction_queue(tracking_key, queue) ⇒ Object

Pops a job from the restriction queue



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 188

def pop_from_restriction_queue(tracking_key, queue)
  queue_key = restriction_queue_key(tracking_key, queue)
  str = Resque.redis.lpop(queue_key)
  post_pop_size = Resque.redis.llen(queue_key)

  if post_pop_size == 0
    update_queues_available(tracking_key, queue, :remove)
    clear_runnable(tracking_key, queue)
  end

  decrement_queue_count(queue)

  # increment by one to indicate that we are running
  increment_running_count(tracking_key) if str

  decode(str)
end

#push_to_restriction_queue(job, location = :back) ⇒ Object

Pushes the job to the restriction queue



173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 173

def push_to_restriction_queue(job, location=:back)
  tracking_key = tracking_key(*job.args)

  case location
    when :back then Resque.redis.rpush(restriction_queue_key(tracking_key, job.queue), encode(job))
    when :front then Resque.redis.lpush(restriction_queue_key(tracking_key, job.queue), encode(job))
    else raise "Invalid location to ConcurrentRestriction.push_to_restriction_queue"
  end

  increment_queue_count(job.queue)
  update_queues_available(tracking_key, job.queue, :add)
  mark_runnable(tracking_key, false)
end

#queue_count_keyObject

The redis key used to store the aggregate number of jobs in restriction queues by queue name



113
114
115
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 113

def queue_count_key
  "concurrent.queue_counts"
end

#queue_countsObject



269
270
271
272
273
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 269

def queue_counts
  value = Resque.redis.hgetall(queue_count_key)
  value = Hash[*value.collect {|k, v| [k, v.to_i] }.flatten]
  return value
end

#queues_available(tracking_key) ⇒ Object



167
168
169
170
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 167

def queues_available(tracking_key)
  availability_key = restriction_queue_availability_key(tracking_key)
  Resque.redis.smembers(availability_key)
end

#release_lock(lock_key, lock_expiration) ⇒ Object

Releases the lock acquired by #acquire_lock



344
345
346
347
348
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 344

def release_lock(lock_key, lock_expiration)
  # Only delete the lock if the one we created hasn't expired
  expiration_time = lock_expiration + 1
  Resque.redis.del(lock_key) if expiration_time > Time.now.to_i
end

#release_restriction(job) ⇒ Object

Decrements the running_count - to be called at end of job



433
434
435
436
437
438
439
440
441
442
443
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 433

def release_restriction(job)
  tracking_key = tracking_key(*job.args)
  lock_key = lock_key(tracking_key)

  run_atomically(lock_key) do

    # decrement the count after a job has run
    decrement_running_count(tracking_key)

  end
end

#reset_restrictionsObject

Resets everything to be runnable



446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 446

def reset_restrictions

  counts_reset = 0
  count_keys = Resque.redis.keys("concurrent.count.*")
  if count_keys.size > 0
    count_keys.each_slice(10000) do |key_slice|
      counts_reset += Resque.redis.del(*key_slice)
    end
  end

  runnable_keys = Resque.redis.keys("concurrent.runnable*")
  if runnable_keys.size > 0
    runnable_keys.each_slice(10000) do |runnable_slice|
      Resque.redis.del(*runnable_slice)
    end
  end

  Resque.redis.del(queue_count_key)
  queues_enabled = 0
  queue_keys = Resque.redis.keys("concurrent.queue.*")
  queue_keys.each do |k|
    len = Resque.redis.llen(k)
    if len > 0
      parts = k.split(".")
      queue = parts[2]
      ident = parts[3..-1].join('.')
      tracking_key = "concurrent.tracking.#{ident}"

      increment_queue_count(queue, len)
      update_queues_available(tracking_key, queue, :add)
      mark_runnable(tracking_key, true)
      queues_enabled += 1
    end
  end

  return counts_reset, queues_enabled
  
end

#restricted?(tracking_key) ⇒ Boolean

Returns:

  • (Boolean)


230
231
232
233
234
235
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 230

def restricted?(tracking_key)
  count_key = running_count_key(tracking_key)
  value = Resque.redis.get(count_key).to_i
  restricted = (value >= concurrent_limit)
  return restricted
end

#restriction_queue(tracking_key, queue) ⇒ Object

Grabs the contents of the restriction queue (decoded)



212
213
214
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 212

def restriction_queue(tracking_key, queue)
  restriction_queue_raw(tracking_key, queue).collect {|s| decode(s) }
end

#restriction_queue_availability_key(tracking_key) ⇒ Object



117
118
119
120
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 117

def restriction_queue_availability_key(tracking_key)
  parts = tracking_key.split(".")
  "concurrent.queue_availability.#{parts[2..-1].join('.')}"
end

#restriction_queue_key(tracking_key, queue) ⇒ Object

The key for the redis list where restricted jobs for the given resque queue are stored



106
107
108
109
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 106

def restriction_queue_key(tracking_key, queue)
  parts = tracking_key.split(".")
  "concurrent.queue.#{queue}.#{parts[2..-1].join('.')}"
end

#restriction_queue_raw(tracking_key, queue) ⇒ Object

Grabs the raw data (undecoded) from the restriction queue



207
208
209
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 207

def restriction_queue_raw(tracking_key, queue)
  Array(Resque.redis.lrange(restriction_queue_key(tracking_key, queue), 0, -1))
end

#run_atomically(lock_key, tries = ConcurrentRestriction.lock_tries) ⇒ Object

Uses a lock to ensure that a sequence of redis operations happen atomically We don’t use watch/multi/exec as it doesn’t work in a DistributedRedis setup



353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 353

def run_atomically(lock_key, tries=ConcurrentRestriction.lock_tries)
  acquired_lock = false
  exp_backoff = 1

  tries.times do
    lock_expiration = Time.now.to_i + ConcurrentRestriction.lock_timeout
    if acquire_lock(lock_key, lock_expiration)
      acquired_lock = true
      begin
        yield
      ensure
        release_lock(lock_key, lock_expiration)
      end
      break
    else
      sleep(rand(100) * 0.001 * exp_backoff)
      exp_backoff *= 2
    end
  end
  
  return acquired_lock
end

#runnable?(tracking_key, queue) ⇒ Boolean

Returns:

  • (Boolean)


279
280
281
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 279

def runnable?(tracking_key, queue)
  Resque.redis.sismember(runnables_key(queue), tracking_key)
end

#runnables(queue = nil) ⇒ Object

Returns the list of tracking_keys that have jobs waiting to run (are not over the concurrency limit)



288
289
290
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 288

def runnables(queue=nil)
  Resque.redis.smembers(runnables_key(queue))
end

#runnables_key(queue = nil) ⇒ Object

The key to the redis set where we keep a list of runnable tracking_keys



137
138
139
140
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 137

def runnables_key(queue=nil)
  key = ".#{queue}" if queue
  "concurrent.runnable#{key}"
end

#running_count(tracking_key) ⇒ Object

Returns the number of jobs currently running



217
218
219
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 217

def running_count(tracking_key)
  Resque.redis.get(running_count_key(tracking_key)).to_i
end

#running_count_key(tracking_key) ⇒ Object

The redis key used to store the number of currently running jobs for the restriction_identifier



100
101
102
103
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 100

def running_count_key(tracking_key)
  parts = tracking_key.split(".")
  "concurrent.count.#{parts[2..-1].join('.')}"
end

#set_queue_count(queue, count) ⇒ Object



275
276
277
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 275

def set_queue_count(queue, count)
  Resque.redis.hset(queue_count_key, queue, count)
end

#set_running_count(tracking_key, value) ⇒ Object

Returns the number of jobs currently running



222
223
224
225
226
227
228
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 222

def set_running_count(tracking_key, value)
  count_key = running_count_key(tracking_key)
  Resque.redis.set(count_key, value)
  restricted = (value > concurrent_limit)
  mark_runnable(tracking_key, !restricted)
  return restricted
end

#stash_if_restricted(job) ⇒ Object

Pushes the job to restriction queue if it is restricted If the job is within the concurrency limit, thus needs to be run, we keep the running count incremented so that other calls don’t erroneously see a lower value and run their job. This count gets decremented by call to release_restriction when job completes



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 381

def stash_if_restricted(job)
  restricted = nil
  tracking_key = tracking_key(*job.args)
  lock_key = lock_key(tracking_key)

  did_run = run_atomically(lock_key) do

    restricted = restricted?(tracking_key)
    if restricted
      push_to_restriction_queue(job)
    else
      increment_running_count(tracking_key)
    end

  end
  
  # if run_atomically fails to acquire the lock, we need to put
  # the job back on the queue for processing later and act restricted
  # upstack so nothing gets run
  if !did_run
    restricted = true
    job.recreate
  end
  
  return restricted
end

#stats(extended = false) ⇒ Object



485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 485

def stats(extended=false)
  result = {}

  result[:queues] = queue_counts

  if extended
    ident_sizes = {}
    queue_keys = Resque.redis.keys("concurrent.queue.*")
    queue_keys.each do |k|
      parts = k.split(".")
      ident = parts[3..-1].join(".")
      queue_name = parts[2]
      size = Resque.redis.llen(k)
      ident_sizes[ident] ||= {}
      ident_sizes[ident][queue_name] ||= 0
      ident_sizes[ident][queue_name] += size
    end

    count_keys = Resque.redis.keys("concurrent.count.*")
    running_counts = {}
    count_keys.each do |k|
      parts = k.split(".")
      ident = parts[2..-1].join(".")
      ident_sizes[ident] ||= {}
      ident_sizes[ident]["running"] = Resque.redis.get(k).to_i
    end

    result[:identifiers] = ident_sizes
  else
    result[:identifiers] = {}
  end


  lock_keys = Resque.redis.keys("concurrent.lock.*")
  result[:lock_count] = lock_keys.size

  runnable_count = Resque.redis.scard(runnables_key)
  result[:runnable_count] = runnable_count

  return result
  
end

#tracking_class(tracking_key) ⇒ Object



132
133
134
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 132

def tracking_class(tracking_key)
  Resque.constantize(tracking_key.split(".")[2])
end

#tracking_key(*args) ⇒ Object

The key that groups all jobs of the same restriction_identifier together so that we can work on any of those jobs if they are runnable Stored in runnables set, and used to build keys for each queue where jobs for those queues are stored



126
127
128
129
130
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 126

def tracking_key(*args)
  id = concurrent_identifier(*args)
  id = ".#{id}" if id && id.strip.size > 0
  "concurrent.tracking.#{self.to_s}#{id}"
end

#update_queues_available(tracking_key, queue, action) ⇒ Object

The restriction queues that have data for each tracking key Adds/Removes the queue to the list of queues for that tracking key so we can quickly tell in next_runnable_job if a runnable job exists on a specific restriction queue



158
159
160
161
162
163
164
165
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 158

def update_queues_available(tracking_key, queue, action)
  availability_key = restriction_queue_availability_key(tracking_key)
  case action
    when :add then Resque.redis.send(:sadd, availability_key, queue)
    when :remove then Resque.redis.send(:srem, availability_key, queue)
    else raise "Invalid action to ConcurrentRestriction.track_queue"
  end
end