Module: UmbrellioUtils::Jobs

Extended by:
Jobs
Included in:
Jobs
Defined in:
lib/umbrellio_utils/jobs.rb

Defined Under Namespace

Classes: Capsule, Entry, Queue, Worker

Instance Method Summary collapse

Instance Method Details

#capsulesObject



15
16
17
# File 'lib/umbrellio_utils/jobs.rb', line 15

def capsules
  @capsules ||= []
end

#capsules_for(worker, max_concurrency) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/umbrellio_utils/jobs.rb', line 57

def capsules_for(worker, max_concurrency)
  capsules = self.capsules.select do |capsule|
    next unless capsule.worker.to_s == worker.underscore.to_s
    next unless queues.any? { |queue| queue.capsule == capsule.name }
    true
  end

  total_weight = capsules.sum(&:weight)

  result = capsules.filter_map do |capsule|
    weight_coef = capsule.weight / total_weight.to_f
    concurrency = (max_concurrency * weight_coef).to_i
    concurrency = 1 unless concurrency > 1
    queues =
      self.queues.select { |x| x.capsule == capsule.name }.map { |x| [x.name.to_s, x.weight] }
    Entry.new(capsule.name, queues, concurrency)
  end

  raise "No queues found for worker #{worker.inspect}" if result.empty?

  result
end

#configure_capsules!(config, priority_level:, max_concurrency:) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/umbrellio_utils/jobs.rb', line 42

def configure_capsules!(config, priority_level:, max_concurrency:)
  entries = capsules_for(priority_level, max_concurrency)

  unless entries.find { |x| x.capsule == :default }
    entries.last.capsule = :default # Default capsule should always be present in sidekiq
  end

  entries.each do |entry|
    config.capsule(entry.capsule) do |capsule|
      capsule.queues = entry.queues
      capsule.concurrency = entry.concurrency
    end
  end
end

#queuesObject



19
20
21
# File 'lib/umbrellio_utils/jobs.rb', line 19

def queues
  @queues ||= []
end

#register_capsule(name, worker: :default, weight: 1) ⇒ Object



27
28
29
30
# File 'lib/umbrellio_utils/jobs.rb', line 27

def register_capsule(name, worker: :default, weight: 1)
  workers.find { |x| x.name == worker } or raise "Worker not found: #{worker.inspect}"
  capsules << Capsule.new(name, worker, weight)
end

#register_queue(name, capsule: :default, weight: 1) ⇒ Object



32
33
34
35
# File 'lib/umbrellio_utils/jobs.rb', line 32

def register_queue(name, capsule: :default, weight: 1)
  capsules.find { |x| x.name == capsule } or raise "Capsule not found: #{capsule.inspect}"
  queues << Queue.new(name, capsule, weight)
end

#register_worker(name) ⇒ Object



23
24
25
# File 'lib/umbrellio_utils/jobs.rb', line 23

def register_worker(name)
  workers << Worker.new(name)
end

#retry_interval(error_count, min_interval:, max_interval:) ⇒ Object



37
38
39
40
# File 'lib/umbrellio_utils/jobs.rb', line 37

def retry_interval(error_count, min_interval:, max_interval:)
  interval = min_interval * (1.3**(error_count - 3))
  interval.clamp(min_interval, max_interval).round
end

#validate_queue_name!(queue_name) ⇒ Object



80
81
82
83
84
85
86
# File 'lib/umbrellio_utils/jobs.rb', line 80

def validate_queue_name!(queue_name)
  found = queues.any? do |queue|
    queue.name.to_s == queue_name.to_s
  end

  raise "Unknown queue: #{queue_name.inspect}" unless found
end

#workersObject



11
12
13
# File 'lib/umbrellio_utils/jobs.rb', line 11

def workers
  @workers ||= []
end