Module: Sidekiq::LimitFetch::Queues
Constant Summary collapse
- THREAD_KEY =
:acquired_queues
Instance Method Summary collapse
- #acquire ⇒ Object
- #add(queues) ⇒ Object
- #dynamic? ⇒ Boolean
- #handle(queues) ⇒ Object
- #namespace ⇒ Object
- #ordered_queues ⇒ Object
- #release_except(full_name) ⇒ Object
- #remove(queues) ⇒ Object
- #start(options) ⇒ Object
- #startup_queue?(queue) ⇒ Boolean
- #strict_order! ⇒ Object
- #weighted_order! ⇒ Object
Instance Method Details
#acquire ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 22 def acquire queues = saved queues ||= Sidekiq::LimitFetch.redis_retryable do selector.acquire(ordered_queues, namespace) end save queues queues.map { |it| "queue:#{it}" } end |
#add(queues) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 47 def add(queues) return unless queues queues.each do |queue| unless @queues.include? queue if startup_queue?(queue) apply_process_limit_to_queue(queue) apply_limit_to_queue(queue) end @queues.push queue end end end |
#dynamic? ⇒ Boolean
39 40 41 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 39 def dynamic? @dynamic end |
#handle(queues) ⇒ Object
72 73 74 75 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 72 def handle(queues) add(queues - @queues) remove(@queues - queues) end |
#namespace ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 86 def namespace @namespace ||= Sidekiq.redis do |it| if it.respond_to?(:namespace) and it.namespace "#{it.namespace}:" else '' end end end |
#ordered_queues ⇒ Object
79 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 79 def ordered_queues; @queues end |
#release_except(full_name) ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 31 def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end end |
#remove(queues) ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 61 def remove(queues) return unless queues queues.each do |queue| if @queues.include? queue clear_limits_for_queue(queue) @queues.delete queue Sidekiq::Queue.delete_instance(queue) end end end |
#start(options) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 6 def start() @queues = [:queues] @startup_queues = [:queues].dup @dynamic = [:dynamic] @limits = [:limits] || {} @process_limits = [:process_limits] || {} @blocks = [:blocking] || [] [:strict] ? strict_order! : weighted_order! apply_process_limit_to_queues apply_limit_to_queues apply_blocks_to_queues end |
#startup_queue?(queue) ⇒ Boolean
43 44 45 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 43 def startup_queue?(queue) @startup_queues.include?(queue) end |
#strict_order! ⇒ Object
77 78 79 80 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 77 def strict_order! @queues.uniq! def ordered_queues; @queues end end |
#weighted_order! ⇒ Object
82 83 84 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 82 def weighted_order! def ordered_queues; @queues.shuffle.uniq end end |