28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/rails_autoscale_agent/worker_adapters/que.rb', line 28
def collect!(store)
log_msg = String.new
t = Time.now.utc
sql = " SELECT queue, min(run_at)\n FROM que_jobs\n WHERE finished_at IS NULL\n AND expired_at IS NULL\n AND error_count = 0\n GROUP BY 1\n SQL\n\n run_at_by_queue = Hash[select_rows_silently(sql)]\n\n # Don't collect worker metrics if there are unreasonable number of queues\n if run_at_by_queue.size > Config.instance.max_queues\n logger.warn \"Skipping Que metrics - \#{run_at_by_queue.size} queues exceeds the \#{Config.instance.max_queues} queue limit\"\n return\n end\n\n self.queues |= run_at_by_queue.keys\n\n queues.each do |queue|\n run_at = run_at_by_queue[queue]\n run_at = DateTime.parse(run_at) if run_at.is_a?(String)\n latency_ms = run_at ? ((t - run_at)*1000).ceil : 0\n latency_ms = 0 if latency_ms < 0\n\n store.push latency_ms, t, queue\n log_msg << \"que.\#{queue}=\#{latency_ms} \"\n end\n\n logger.debug log_msg unless log_msg.empty?\nend\n"
|