Module: Sidekiq::LimitFetch::Queues

Extended by:
Queues
Included in:
Queues
Defined in:
lib/sidekiq/limit_fetch/queues.rb

Constant Summary collapse

THREAD_KEY =
:acquired_queues

Instance Method Summary collapse

Instance Method Details

#acquireObject



22
23
24
25
26
27
28
29
# File 'lib/sidekiq/limit_fetch/queues.rb', line 22

def acquire
  queues = saved
  queues ||= Sidekiq::LimitFetch.redis_retryable do
    selector.acquire(ordered_queues, namespace)
  end
  save queues
  queues.map { |it| "queue:#{it}" }
end

#add(queues) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/sidekiq/limit_fetch/queues.rb', line 47

def add(queues)
  return unless queues
  queues.each do |queue|
    unless @queues.include? queue
      if startup_queue?(queue)
        apply_process_limit_to_queue(queue)
        apply_limit_to_queue(queue)
      end

      @queues.push queue
    end
  end
end

#dynamic?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/sidekiq/limit_fetch/queues.rb', line 39

def dynamic?
  @dynamic
end

#handle(queues) ⇒ Object



72
73
74
75
# File 'lib/sidekiq/limit_fetch/queues.rb', line 72

def handle(queues)
  add(queues - @queues)
  remove(@queues - queues)
end

#namespaceObject



86
87
88
89
90
91
92
93
94
# File 'lib/sidekiq/limit_fetch/queues.rb', line 86

def namespace
  @namespace ||= Sidekiq.redis do |it|
    if it.respond_to?(:namespace) and it.namespace
      "#{it.namespace}:"
    else
      ''
    end
  end
end

#ordered_queuesObject



79
# File 'lib/sidekiq/limit_fetch/queues.rb', line 79

def ordered_queues; @queues end

#release_except(full_name) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/sidekiq/limit_fetch/queues.rb', line 31

def release_except(full_name)
  queues = restore
  queues.delete full_name[/queue:(.*)/, 1] if full_name
  Sidekiq::LimitFetch.redis_retryable do
    selector.release queues, namespace
  end
end

#remove(queues) ⇒ Object



61
62
63
64
65
66
67
68
69
70
# File 'lib/sidekiq/limit_fetch/queues.rb', line 61

def remove(queues)
  return unless queues
  queues.each do |queue|
    if @queues.include? queue
      clear_limits_for_queue(queue)
      @queues.delete queue
      Sidekiq::Queue.delete_instance(queue)
    end
  end
end

#start(options) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/sidekiq/limit_fetch/queues.rb', line 6

def start(options)
  @queues         = options[:queues]
  @startup_queues = options[:queues].dup
  @dynamic        = options[:dynamic]

  @limits         = options[:limits] || {}
  @process_limits = options[:process_limits] || {}
  @blocks         = options[:blocking] || []

  options[:strict] ? strict_order! : weighted_order!

  apply_process_limit_to_queues
  apply_limit_to_queues
  apply_blocks_to_queues
end

#startup_queue?(queue) ⇒ Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/sidekiq/limit_fetch/queues.rb', line 43

def startup_queue?(queue)
  @startup_queues.include?(queue)
end

#strict_order!Object



77
78
79
80
# File 'lib/sidekiq/limit_fetch/queues.rb', line 77

def strict_order!
  @queues.uniq!
  def ordered_queues; @queues end
end

#weighted_order!Object



82
83
84
# File 'lib/sidekiq/limit_fetch/queues.rb', line 82

def weighted_order!
  def ordered_queues; @queues.shuffle.uniq end
end