Module: Resque::Plugins::ConcurrentRestriction
- Included in:
- ConcurrentRestrictionJob
- Defined in:
- lib/resque/plugins/concurrent_restriction/version.rb,
lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb,
lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb
Defined Under Namespace
Constant Summary collapse
- VERSION =
"0.5.9"
Class Attribute Summary collapse
-
.lock_timeout ⇒ Object
optional.
-
.lock_tries ⇒ Object
optional.
-
.reserve_queued_job_attempts ⇒ Object
optional.
-
.restricted_before_queued ⇒ Object
optional.
Class Method Summary collapse
-
.configure {|_self| ... } ⇒ Object
Allows configuring via class accessors.
Instance Method Summary collapse
-
#acquire_lock(lock_key, lock_expiration) ⇒ Object
Acquires a lock using the given key and lock expiration time.
- #clear_runnable(tracking_key, queue) ⇒ Object
-
#concurrent(limit) ⇒ Object
Used by the user in their job class to set the concurrency limit.
-
#concurrent_identifier(*args) ⇒ Object
Allows the user to specify the unique key that identifies a set of jobs that share a concurrency limit.
-
#concurrent_limit ⇒ Object
Used to query what the limit the user has set.
-
#decode(str) ⇒ Object
Decodes the job from the restriction queue.
- #decrement_queue_count(queue, by = 1) ⇒ Object
- #decrement_running_count(tracking_key) ⇒ Object
-
#encode(job) ⇒ Object
Encodes the job into the restriction queue.
- #get_next_runnable(queue) ⇒ Object
- #increment_queue_count(queue, by = 1) ⇒ Object
-
#increment_running_count(tracking_key) ⇒ Object
The value in redis is the number of jobs currently running If we increment past that, we are restricted.
-
#lock_key(tracking_key) ⇒ Object
The key used to acquire a lock so we can operate on multiple redis structures (runnables set, running_count) atomically.
-
#mark_runnable(tracking_key, runnable) ⇒ Object
Keeps track of which jobs are currently runnable, that is the tracking_key should have jobs on some restriction queue and also have less than concurrency_limit jobs running.
-
#next_runnable_job(queue) ⇒ Object
Returns the next job that is runnable.
-
#pop_from_restriction_queue(tracking_key, queue) ⇒ Object
Pops a job from the restriction queue.
-
#push_to_restriction_queue(job, location = :back) ⇒ Object
Pushes the job to the restriction queue.
-
#queue_count_key ⇒ Object
The redis key used to store the aggregate number of jobs in restriction queues by queue name.
- #queue_counts ⇒ Object
- #queues_available(tracking_key) ⇒ Object
-
#release_lock(lock_key, lock_expiration) ⇒ Object
Releases the lock acquired by #acquire_lock.
-
#release_restriction(job) ⇒ Object
Decrements the running_count - to be called at end of job.
-
#reset_restrictions ⇒ Object
Resets everything to be runnable.
- #restricted?(tracking_key) ⇒ Boolean
-
#restriction_queue(tracking_key, queue) ⇒ Object
Grabs the contents of the restriction queue (decoded).
- #restriction_queue_availability_key(tracking_key) ⇒ Object
-
#restriction_queue_key(tracking_key, queue) ⇒ Object
The key for the redis list where restricted jobs for the given resque queue are stored.
-
#restriction_queue_raw(tracking_key, queue) ⇒ Object
Grabs the raw data (undecoded) from the restriction queue.
-
#run_atomically(lock_key, tries = ConcurrentRestriction.lock_tries) ⇒ Object
Uses a lock to ensure that a sequence of redis operations happen atomically We don’t use watch/multi/exec as it doesn’t work in a DistributedRedis setup.
- #runnable?(tracking_key, queue) ⇒ Boolean
-
#runnables(queue = nil) ⇒ Object
Returns the list of tracking_keys that have jobs waiting to run (are not over the concurrency limit).
-
#runnables_key(queue = nil) ⇒ Object
The key to the redis set where we keep a list of runnable tracking_keys.
-
#running_count(tracking_key) ⇒ Object
Returns the number of jobs currently running.
-
#running_count_key(tracking_key) ⇒ Object
The redis key used to store the number of currently running jobs for the restriction_identifier.
- #set_queue_count(queue, count) ⇒ Object
-
#set_running_count(tracking_key, value) ⇒ Object
Returns the number of jobs currently running.
-
#stash_if_restricted(job) ⇒ Object
Pushes the job to restriction queue if it is restricted If the job is within the concurrency limit, thus needs to be run, we keep the running count incremented so that other calls don’t erroneously see a lower value and run their job.
- #stats(extended = false) ⇒ Object
- #tracking_class(tracking_key) ⇒ Object
-
#tracking_key(*args) ⇒ Object
The key that groups all jobs of the same restriction_identifier together so that we can work on any of those jobs if they are runnable Stored in runnables set, and used to build keys for each queue where jobs for those queues are stored.
-
#update_queues_available(tracking_key, queue, action) ⇒ Object
The restriction queues that have data for each tracking key Adds/Removes the queue to the list of queues for that tracking key so we can quickly tell in next_runnable_job if a runnable job exists on a specific restriction queue.
Class Attribute Details
.lock_timeout ⇒ Object
optional
25 26 27 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25 def lock_timeout @lock_timeout end |
.lock_tries ⇒ Object
optional
25 26 27 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25 def lock_tries @lock_tries end |
.reserve_queued_job_attempts ⇒ Object
optional
25 26 27 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25 def reserve_queued_job_attempts @reserve_queued_job_attempts end |
.restricted_before_queued ⇒ Object
optional
25 26 27 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 25 def restricted_before_queued @restricted_before_queued end |
Class Method Details
.configure {|_self| ... } ⇒ Object
Allows configuring via class accessors
35 36 37 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 35 def self.configure yield self end |
Instance Method Details
#acquire_lock(lock_key, lock_expiration) ⇒ Object
Acquires a lock using the given key and lock expiration time
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 319 def acquire_lock(lock_key, lock_expiration) # acquire the lock to work on the restriction queue expiration_time = lock_expiration + 1 acquired_lock = Resque.redis.setnx(lock_key, expiration_time) # If we didn't acquire the lock, check the expiration as described # at http://redis.io/commands/setnx if ! acquired_lock # If expiration time is in the future, then someone else beat us to getting the lock old_expiration_time = Resque.redis.get(lock_key) return false if old_expiration_time.to_i > Time.now.to_i # if expiration time was in the future when we set it, then someone beat us to it old_expiration_time = Resque.redis.getset(lock_key, expiration_time) return false if old_expiration_time.to_i > Time.now.to_i end # expire the lock eventually so we clean up keys - not needed to timeout # lock, just to keep redis clean for locks that aren't being used' Resque.redis.expireat(lock_key, expiration_time + 300) return true end |
#clear_runnable(tracking_key, queue) ⇒ Object
313 314 315 316 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 313 def clear_runnable(tracking_key, queue) Resque.redis.srem(runnables_key(queue), tracking_key) Resque.redis.srem(runnables_key, tracking_key) end |
#concurrent(limit) ⇒ Object
Used by the user in their job class to set the concurrency limit
77 78 79 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 77 def concurrent(limit) @concurrent = limit end |
#concurrent_identifier(*args) ⇒ Object
Allows the user to specify the unique key that identifies a set of jobs that share a concurrency limit. Defaults to the job class name
83 84 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 83 def concurrent_identifier(*args) end |
#concurrent_limit ⇒ Object
Used to query what the limit the user has set
87 88 89 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 87 def concurrent_limit @concurrent ||= 1 end |
#decode(str) ⇒ Object
Decodes the job from the restriction queue
149 150 151 152 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 149 def decode(str) item = Resque.decode(str) Resque::Job.new(item['queue'], item['payload']) if item end |
#decrement_queue_count(queue, by = 1) ⇒ Object
264 265 266 267 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 264 def decrement_queue_count(queue, by=1) value = Resque.redis.hincrby(queue_count_key, queue, -by) return value end |
#decrement_running_count(tracking_key) ⇒ Object
250 251 252 253 254 255 256 257 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 250 def decrement_running_count(tracking_key) count_key = running_count_key(tracking_key) value = Resque.redis.decr(count_key) Resque.redis.set(count_key, 0) if value < 0 restricted = (value >= concurrent_limit) mark_runnable(tracking_key, !restricted) return restricted end |
#encode(job) ⇒ Object
Encodes the job into the restriction queue
143 144 145 146 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 143 def encode(job) item = {:queue => job.queue, :payload => job.payload} Resque.encode(item) end |
#get_next_runnable(queue) ⇒ Object
283 284 285 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 283 def get_next_runnable(queue) Resque.redis.srandmember(runnables_key(queue)) end |
#increment_queue_count(queue, by = 1) ⇒ Object
259 260 261 262 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 259 def increment_queue_count(queue, by=1) value = Resque.redis.hincrby(queue_count_key, queue, by) return value end |
#increment_running_count(tracking_key) ⇒ Object
The value in redis is the number of jobs currently running If we increment past that, we are restricted. Incrementing is only done after the job is cleared for execution due to checking the runnable state, and post increment we setup runnable for future jobs based on the new “restricted” value
242 243 244 245 246 247 248 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 242 def increment_running_count(tracking_key) count_key = running_count_key(tracking_key) value = Resque.redis.incr(count_key) restricted = (value >= concurrent_limit) mark_runnable(tracking_key, !restricted) return restricted end |
#lock_key(tracking_key) ⇒ Object
The key used to acquire a lock so we can operate on multiple redis structures (runnables set, running_count) atomically
93 94 95 96 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 93 def lock_key(tracking_key) parts = tracking_key.split(".") "concurrent.lock.#{parts[2..-1].join('.')}" end |
#mark_runnable(tracking_key, runnable) ⇒ Object
Keeps track of which jobs are currently runnable, that is the tracking_key should have jobs on some restriction queue and also have less than concurrency_limit jobs running
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 296 def mark_runnable(tracking_key, runnable) queues = queues_available(tracking_key) queues.each do |queue| runnable_queues_key = runnables_key(queue) if runnable Resque.redis.sadd(runnable_queues_key, tracking_key) else Resque.redis.srem(runnable_queues_key, tracking_key) end end if runnable Resque.redis.sadd(runnables_key, tracking_key) if queues.size > 0 else Resque.redis.srem(runnables_key, tracking_key) end end |
#next_runnable_job(queue) ⇒ Object
Returns the next job that is runnable
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 409 def next_runnable_job(queue) tracking_key = get_next_runnable(queue) return nil unless tracking_key job = nil lock_key = lock_key(tracking_key) run_atomically(lock_key) do # since we don't have a lock when we get the runnable, # we need to check it again still_runnable = runnable?(tracking_key, queue) if still_runnable klazz = tracking_class(tracking_key) job = klazz.pop_from_restriction_queue(tracking_key, queue) end end return job end |
#pop_from_restriction_queue(tracking_key, queue) ⇒ Object
Pops a job from the restriction queue
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 188 def pop_from_restriction_queue(tracking_key, queue) queue_key = restriction_queue_key(tracking_key, queue) str = Resque.redis.lpop(queue_key) post_pop_size = Resque.redis.llen(queue_key) if post_pop_size == 0 update_queues_available(tracking_key, queue, :remove) clear_runnable(tracking_key, queue) end decrement_queue_count(queue) # increment by one to indicate that we are running increment_running_count(tracking_key) if str decode(str) end |
#push_to_restriction_queue(job, location = :back) ⇒ Object
Pushes the job to the restriction queue
173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 173 def push_to_restriction_queue(job, location=:back) tracking_key = tracking_key(*job.args) case location when :back then Resque.redis.rpush(restriction_queue_key(tracking_key, job.queue), encode(job)) when :front then Resque.redis.lpush(restriction_queue_key(tracking_key, job.queue), encode(job)) else raise "Invalid location to ConcurrentRestriction.push_to_restriction_queue" end increment_queue_count(job.queue) update_queues_available(tracking_key, job.queue, :add) mark_runnable(tracking_key, false) end |
#queue_count_key ⇒ Object
The redis key used to store the aggregate number of jobs in restriction queues by queue name
113 114 115 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 113 def queue_count_key "concurrent.queue_counts" end |
#queue_counts ⇒ Object
269 270 271 272 273 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 269 def queue_counts value = Resque.redis.hgetall(queue_count_key) value = Hash[*value.collect {|k, v| [k, v.to_i] }.flatten] return value end |
#queues_available(tracking_key) ⇒ Object
167 168 169 170 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 167 def queues_available(tracking_key) availability_key = restriction_queue_availability_key(tracking_key) Resque.redis.smembers(availability_key) end |
#release_lock(lock_key, lock_expiration) ⇒ Object
Releases the lock acquired by #acquire_lock
344 345 346 347 348 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 344 def release_lock(lock_key, lock_expiration) # Only delete the lock if the one we created hasn't expired expiration_time = lock_expiration + 1 Resque.redis.del(lock_key) if expiration_time > Time.now.to_i end |
#release_restriction(job) ⇒ Object
Decrements the running_count - to be called at end of job
433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 433 def release_restriction(job) tracking_key = tracking_key(*job.args) lock_key = lock_key(tracking_key) run_atomically(lock_key) do # decrement the count after a job has run decrement_running_count(tracking_key) end end |
#reset_restrictions ⇒ Object
Resets everything to be runnable
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 446 def reset_restrictions counts_reset = 0 count_keys = Resque.redis.keys("concurrent.count.*") if count_keys.size > 0 count_keys.each_slice(10000) do |key_slice| counts_reset += Resque.redis.del(*key_slice) end end runnable_keys = Resque.redis.keys("concurrent.runnable*") if runnable_keys.size > 0 runnable_keys.each_slice(10000) do |runnable_slice| Resque.redis.del(*runnable_slice) end end Resque.redis.del(queue_count_key) queues_enabled = 0 queue_keys = Resque.redis.keys("concurrent.queue.*") queue_keys.each do |k| len = Resque.redis.llen(k) if len > 0 parts = k.split(".") queue = parts[2] ident = parts[3..-1].join('.') tracking_key = "concurrent.tracking.#{ident}" increment_queue_count(queue, len) update_queues_available(tracking_key, queue, :add) mark_runnable(tracking_key, true) queues_enabled += 1 end end return counts_reset, queues_enabled end |
#restricted?(tracking_key) ⇒ Boolean
230 231 232 233 234 235 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 230 def restricted?(tracking_key) count_key = running_count_key(tracking_key) value = Resque.redis.get(count_key).to_i restricted = (value >= concurrent_limit) return restricted end |
#restriction_queue(tracking_key, queue) ⇒ Object
Grabs the contents of the restriction queue (decoded)
212 213 214 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 212 def restriction_queue(tracking_key, queue) restriction_queue_raw(tracking_key, queue).collect {|s| decode(s) } end |
#restriction_queue_availability_key(tracking_key) ⇒ Object
117 118 119 120 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 117 def restriction_queue_availability_key(tracking_key) parts = tracking_key.split(".") "concurrent.queue_availability.#{parts[2..-1].join('.')}" end |
#restriction_queue_key(tracking_key, queue) ⇒ Object
The key for the redis list where restricted jobs for the given resque queue are stored
106 107 108 109 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 106 def restriction_queue_key(tracking_key, queue) parts = tracking_key.split(".") "concurrent.queue.#{queue}.#{parts[2..-1].join('.')}" end |
#restriction_queue_raw(tracking_key, queue) ⇒ Object
Grabs the raw data (undecoded) from the restriction queue
207 208 209 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 207 def restriction_queue_raw(tracking_key, queue) Array(Resque.redis.lrange(restriction_queue_key(tracking_key, queue), 0, -1)) end |
#run_atomically(lock_key, tries = ConcurrentRestriction.lock_tries) ⇒ Object
Uses a lock to ensure that a sequence of redis operations happen atomically We don’t use watch/multi/exec as it doesn’t work in a DistributedRedis setup
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 353 def run_atomically(lock_key, tries=ConcurrentRestriction.lock_tries) acquired_lock = false exp_backoff = 1 tries.times do lock_expiration = Time.now.to_i + ConcurrentRestriction.lock_timeout if acquire_lock(lock_key, lock_expiration) acquired_lock = true begin yield ensure release_lock(lock_key, lock_expiration) end break else sleep(rand(100) * 0.001 * exp_backoff) exp_backoff *= 2 end end return acquired_lock end |
#runnable?(tracking_key, queue) ⇒ Boolean
279 280 281 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 279 def runnable?(tracking_key, queue) Resque.redis.sismember(runnables_key(queue), tracking_key) end |
#runnables(queue = nil) ⇒ Object
Returns the list of tracking_keys that have jobs waiting to run (are not over the concurrency limit)
288 289 290 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 288 def runnables(queue=nil) Resque.redis.smembers(runnables_key(queue)) end |
#runnables_key(queue = nil) ⇒ Object
The key to the redis set where we keep a list of runnable tracking_keys
137 138 139 140 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 137 def runnables_key(queue=nil) key = ".#{queue}" if queue "concurrent.runnable#{key}" end |
#running_count(tracking_key) ⇒ Object
Returns the number of jobs currently running
217 218 219 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 217 def running_count(tracking_key) Resque.redis.get(running_count_key(tracking_key)).to_i end |
#running_count_key(tracking_key) ⇒ Object
The redis key used to store the number of currently running jobs for the restriction_identifier
100 101 102 103 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 100 def running_count_key(tracking_key) parts = tracking_key.split(".") "concurrent.count.#{parts[2..-1].join('.')}" end |
#set_queue_count(queue, count) ⇒ Object
275 276 277 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 275 def set_queue_count(queue, count) Resque.redis.hset(queue_count_key, queue, count) end |
#set_running_count(tracking_key, value) ⇒ Object
Returns the number of jobs currently running
222 223 224 225 226 227 228 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 222 def set_running_count(tracking_key, value) count_key = running_count_key(tracking_key) Resque.redis.set(count_key, value) restricted = (value > concurrent_limit) mark_runnable(tracking_key, !restricted) return restricted end |
#stash_if_restricted(job) ⇒ Object
Pushes the job to restriction queue if it is restricted If the job is within the concurrency limit, thus needs to be run, we keep the running count incremented so that other calls don’t erroneously see a lower value and run their job. This count gets decremented by call to release_restriction when job completes
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 381 def stash_if_restricted(job) restricted = nil tracking_key = tracking_key(*job.args) lock_key = lock_key(tracking_key) did_run = run_atomically(lock_key) do restricted = restricted?(tracking_key) if restricted push_to_restriction_queue(job) else increment_running_count(tracking_key) end end # if run_atomically fails to acquire the lock, we need to put # the job back on the queue for processing later and act restricted # upstack so nothing gets run if !did_run restricted = true job.recreate end return restricted end |
#stats(extended = false) ⇒ Object
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 485 def stats(extended=false) result = {} result[:queues] = queue_counts if extended ident_sizes = {} queue_keys = Resque.redis.keys("concurrent.queue.*") queue_keys.each do |k| parts = k.split(".") ident = parts[3..-1].join(".") queue_name = parts[2] size = Resque.redis.llen(k) ident_sizes[ident] ||= {} ident_sizes[ident][queue_name] ||= 0 ident_sizes[ident][queue_name] += size end count_keys = Resque.redis.keys("concurrent.count.*") running_counts = {} count_keys.each do |k| parts = k.split(".") ident = parts[2..-1].join(".") ident_sizes[ident] ||= {} ident_sizes[ident]["running"] = Resque.redis.get(k).to_i end result[:identifiers] = ident_sizes else result[:identifiers] = {} end lock_keys = Resque.redis.keys("concurrent.lock.*") result[:lock_count] = lock_keys.size runnable_count = Resque.redis.scard(runnables_key) result[:runnable_count] = runnable_count return result end |
#tracking_class(tracking_key) ⇒ Object
132 133 134 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 132 def tracking_class(tracking_key) Resque.constantize(tracking_key.split(".")[2]) end |
#tracking_key(*args) ⇒ Object
The key that groups all jobs of the same restriction_identifier together so that we can work on any of those jobs if they are runnable Stored in runnables set, and used to build keys for each queue where jobs for those queues are stored
126 127 128 129 130 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 126 def tracking_key(*args) id = concurrent_identifier(*args) id = ".#{id}" if id && id.strip.size > 0 "concurrent.tracking.#{self.to_s}#{id}" end |
#update_queues_available(tracking_key, queue, action) ⇒ Object
The restriction queues that have data for each tracking key Adds/Removes the queue to the list of queues for that tracking key so we can quickly tell in next_runnable_job if a runnable job exists on a specific restriction queue
158 159 160 161 162 163 164 165 |
# File 'lib/resque/plugins/concurrent_restriction/concurrent_restriction_job.rb', line 158 def update_queues_available(tracking_key, queue, action) availability_key = restriction_queue_availability_key(tracking_key) case action when :add then Resque.redis.send(:sadd, availability_key, queue) when :remove then Resque.redis.send(:srem, availability_key, queue) else raise "Invalid action to ConcurrentRestriction.track_queue" end end |