22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
# File 'lib/rails_autoscale_agent/worker_adapters/delayed_job.rb', line 22
def collect!(store)
log_msg = String.new
t = Time.now.utc
sql = <<~SQL
SELECT COALESCE(queue, 'default'), min(run_at)
FROM delayed_jobs
WHERE locked_at IS NULL
AND failed_at IS NULL
GROUP BY queue
SQL
run_at_by_queue = Hash[select_rows_silently(sql)]
if run_at_by_queue.size > Config.instance.max_queues
logger.warn "Skipping DelayedJob metrics - #{run_at_by_queue.size} queues exceeds the #{Config.instance.max_queues} queue limit"
return
end
self.queues = queues | run_at_by_queue.keys
if track_long_running_jobs?
sql = <<~SQL
SELECT COALESCE(queue, 'default'), count(*)
FROM delayed_jobs
WHERE locked_at IS NOT NULL
AND locked_by IS NOT NULL
AND failed_at IS NULL
GROUP BY 1
SQL
busy_count_by_queue = Hash[select_rows_silently(sql)]
self.queues = queues | busy_count_by_queue.keys
end
queues.each do |queue|
run_at = run_at_by_queue[queue]
run_at = DateTime.parse(run_at) if run_at.is_a?(String)
latency_ms = run_at ? ((t - run_at)*1000).ceil : 0
latency_ms = 0 if latency_ms < 0
store.push latency_ms, t, queue
log_msg << "dj-qt.#{queue}=#{latency_ms} "
if track_long_running_jobs?
busy_count = busy_count_by_queue[queue] || 0
store.push busy_count, Time.now, queue, :busy
log_msg << "dj-busy.#{queue}=#{busy_count} "
end
end
logger.debug log_msg unless log_msg.empty?
end
|