Module: UmbrellioUtils::Jobs
- Extended by:
- Jobs
- Included in:
- Jobs
- Defined in:
- lib/umbrellio_utils/jobs.rb
Defined Under Namespace
Classes: Capsule, Entry, Queue, Worker
Instance Method Summary
collapse
Instance Method Details
#capsules ⇒ Object
15
16
17
|
# File 'lib/umbrellio_utils/jobs.rb', line 15
def capsules
@capsules ||= []
end
|
#capsules_for(worker, max_concurrency) ⇒ Object
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/umbrellio_utils/jobs.rb', line 57
def capsules_for(worker, max_concurrency)
capsules = self.capsules.select do |capsule|
next unless capsule.worker.to_s == worker.underscore.to_s
next unless queues.any? { |queue| queue.capsule == capsule.name }
true
end
total_weight = capsules.sum(&:weight)
result = capsules.filter_map do |capsule|
weight_coef = capsule.weight / total_weight.to_f
concurrency = (max_concurrency * weight_coef).to_i
concurrency = 1 unless concurrency > 1
queues =
self.queues.select { |x| x.capsule == capsule.name }.map { |x| [x.name.to_s, x.weight] }
Entry.new(capsule.name, queues, concurrency)
end
raise "No queues found for worker #{worker.inspect}" if result.empty?
result
end
|
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/umbrellio_utils/jobs.rb', line 42
def configure_capsules!(config, priority_level:, max_concurrency:)
entries = capsules_for(priority_level, max_concurrency)
unless entries.find { |x| x.capsule == :default }
entries.last.capsule = :default end
entries.each do |entry|
config.capsule(entry.capsule) do |capsule|
capsule.queues = entry.queues
capsule.concurrency = entry.concurrency
end
end
end
|
#queues ⇒ Object
19
20
21
|
# File 'lib/umbrellio_utils/jobs.rb', line 19
def queues
@queues ||= []
end
|
#register_capsule(name, worker: :default, weight: 1) ⇒ Object
27
28
29
30
|
# File 'lib/umbrellio_utils/jobs.rb', line 27
def register_capsule(name, worker: :default, weight: 1)
workers.find { |x| x.name == worker } or raise "Worker not found: #{worker.inspect}"
capsules << Capsule.new(name, worker, weight)
end
|
#register_queue(name, capsule: :default, weight: 1) ⇒ Object
32
33
34
35
|
# File 'lib/umbrellio_utils/jobs.rb', line 32
def register_queue(name, capsule: :default, weight: 1)
capsules.find { |x| x.name == capsule } or raise "Capsule not found: #{capsule.inspect}"
queues << Queue.new(name, capsule, weight)
end
|
#register_worker(name) ⇒ Object
23
24
25
|
# File 'lib/umbrellio_utils/jobs.rb', line 23
def register_worker(name)
workers << Worker.new(name)
end
|
#retry_interval(error_count, min_interval:, max_interval:) ⇒ Object
37
38
39
40
|
# File 'lib/umbrellio_utils/jobs.rb', line 37
def retry_interval(error_count, min_interval:, max_interval:)
interval = min_interval * (1.3**(error_count - 3))
interval.clamp(min_interval, max_interval).round
end
|
#validate_queue_name!(queue_name) ⇒ Object
80
81
82
83
84
85
86
|
# File 'lib/umbrellio_utils/jobs.rb', line 80
def validate_queue_name!(queue_name)
found = queues.any? do |queue|
queue.name.to_s == queue_name.to_s
end
raise "Unknown queue: #{queue_name.inspect}" unless found
end
|
#workers ⇒ Object
11
12
13
|
# File 'lib/umbrellio_utils/jobs.rb', line 11
def workers
@workers ||= []
end
|