Module: Sidekiq::LimitFetch::Queues
Constant Summary collapse
- THREAD_KEY =
:acquired_queues
Instance Method Summary collapse
-
#acquire ⇒ Object
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity.
- #add(queues) ⇒ Object
- #dynamic? ⇒ Boolean
- #dynamic_exclude ⇒ Object
- #handle(queues) ⇒ Object
-
#namespace ⇒ Object
rubocop:enable Lint/NestedMethodDefinition.
- #ordered_queues ⇒ Object
- #release_except(full_name) ⇒ Object
- #remove(queues) ⇒ Object
-
#start(capsule_or_options) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity.
- #startup_queue?(queue) ⇒ Boolean
-
#strict_order! ⇒ Object
rubocop:disable Lint/NestedMethodDefinition.
- #weighted_order! ⇒ Object
Instance Method Details
#acquire ⇒ Object
rubocop:enable Metrics/AbcSize rubocop:enable Metrics/CyclomaticComplexity rubocop:enable Metrics/MethodLength rubocop:enable Metrics/PerceivedComplexity
49 50 51 52 53 54 55 56 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 49 def acquire queues = saved queues ||= Sidekiq::LimitFetch.redis_retryable do selector.acquire(ordered_queues, namespace) end save queues queues.map { |it| "queue:#{it}" } end |
#add(queues) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 78 def add(queues) return unless queues queues.each do |queue| next if @queues.include? queue if startup_queue?(queue) apply_process_limit_to_queue(queue) apply_limit_to_queue(queue) end @queues.push queue end end |
#dynamic? ⇒ Boolean
66 67 68 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 66 def dynamic? @dynamic end |
#dynamic_exclude ⇒ Object
74 75 76 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 74 def dynamic_exclude @dynamic_exclude end |
#handle(queues) ⇒ Object
105 106 107 108 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 105 def handle(queues) add(queues - @queues) remove(@queues - queues) end |
#namespace ⇒ Object
rubocop:enable Lint/NestedMethodDefinition
125 126 127 128 129 130 131 132 133 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 125 def namespace @namespace ||= Sidekiq.redis do |it| if it.respond_to?(:namespace) && it.namespace "#{it.namespace}:" else '' end end end |
#ordered_queues ⇒ Object
113 114 115 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 113 def ordered_queues @queues end |
#release_except(full_name) ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 58 def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end end |
#remove(queues) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 93 def remove(queues) return unless queues queues.each do |queue| next unless @queues.include? queue clear_limits_for_queue(queue) @queues.delete queue Sidekiq::Queue.delete_instance(queue) end end |
#start(capsule_or_options) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/CyclomaticComplexity rubocop:disable Metrics/MethodLength rubocop:disable Metrics/PerceivedComplexity
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 14 def start() config = Sidekiq::LimitFetch.post_7? ? .config : @queues = config[:queues].map do |queue| if queue.is_a? Array queue.first else queue end end.uniq @startup_queues = @queues.dup if config[:dynamic].is_a? Hash @dynamic = true @dynamic_exclude = config[:dynamic][:exclude] || [] else @dynamic = config[:dynamic] @dynamic_exclude = [] end @limits = config[:limits] || {} @process_limits = config[:process_limits] || {} @blocks = config[:blocking] || [] config[:strict] ? strict_order! : weighted_order! apply_process_limit_to_queues apply_limit_to_queues apply_blocks_to_queues end |
#startup_queue?(queue) ⇒ Boolean
70 71 72 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 70 def startup_queue?(queue) @startup_queues.include?(queue) end |
#strict_order! ⇒ Object
rubocop:disable Lint/NestedMethodDefinition
111 112 113 114 115 116 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 111 def strict_order! @queues.uniq! def ordered_queues @queues end end |
#weighted_order! ⇒ Object
118 119 120 121 122 |
# File 'lib/sidekiq/limit_fetch/queues.rb', line 118 def weighted_order! def ordered_queues @queues.shuffle.uniq end end |