Class: Sidekiq::Stats
- Inherits:
-
Object
- Object
- Sidekiq::Stats
- Defined in:
- lib/sidekiq/api.rb
Overview
Defined Under Namespace
Classes: History
Instance Method Summary collapse
- #dead_size ⇒ Object
- #default_queue_latency ⇒ Object
- #enqueued ⇒ Object
- #failed ⇒ Object
- #fetch_stats! ⇒ Object private
-
#fetch_stats_fast! ⇒ Object
private
O(1) redis calls.
-
#fetch_stats_slow! ⇒ Object
private
O(number of processes + number of queues) redis calls.
-
#initialize ⇒ Stats
constructor
A new instance of Stats.
- #processed ⇒ Object
- #processes_size ⇒ Object
- #queues ⇒ Object
- #reset(*stats) ⇒ Object private
- #retry_size ⇒ Object
- #scheduled_size ⇒ Object
- #workers_size ⇒ Object
Constructor Details
#initialize ⇒ Stats
Returns a new instance of Stats.
28 29 30 |
# File 'lib/sidekiq/api.rb', line 28 def initialize fetch_stats_fast! end |
Instance Method Details
#dead_size ⇒ Object
48 49 50 |
# File 'lib/sidekiq/api.rb', line 48 def dead_size stat :dead_size end |
#default_queue_latency ⇒ Object
64 65 66 |
# File 'lib/sidekiq/api.rb', line 64 def default_queue_latency stat :default_queue_latency end |
#enqueued ⇒ Object
52 53 54 |
# File 'lib/sidekiq/api.rb', line 52 def enqueued stat :enqueued end |
#failed ⇒ Object
36 37 38 |
# File 'lib/sidekiq/api.rb', line 36 def failed stat :failed end |
#fetch_stats! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
151 152 153 154 |
# File 'lib/sidekiq/api.rb', line 151 def fetch_stats! fetch_stats_fast! fetch_stats_slow! end |
#fetch_stats_fast! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
O(1) redis calls
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/sidekiq/api.rb', line 85 def fetch_stats_fast! pipe1_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| pipeline.get("stat:processed") pipeline.get("stat:failed") pipeline.zcard("schedule") pipeline.zcard("retry") pipeline.zcard("dead") pipeline.scard("processes") pipeline.lindex("queue:default", -1) end } default_queue_latency = if (entry = pipe1_res[6]) job = begin Sidekiq.load_json(entry) rescue {} end now = Time.now.to_f thence = job["enqueued_at"] || now now - thence else 0 end @stats = { processed: pipe1_res[0].to_i, failed: pipe1_res[1].to_i, scheduled_size: pipe1_res[2], retry_size: pipe1_res[3], dead_size: pipe1_res[4], processes_size: pipe1_res[5], default_queue_latency: default_queue_latency } end |
#fetch_stats_slow! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
O(number of processes + number of queues) redis calls
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/sidekiq/api.rb', line 125 def fetch_stats_slow! processes = Sidekiq.redis { |conn| conn.sscan("processes").to_a } queues = Sidekiq.redis { |conn| conn.sscan("queues").to_a } pipe2_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| processes.each { |key| pipeline.hget(key, "busy") } queues.each { |queue| pipeline.llen("queue:#{queue}") } end } s = processes.size workers_size = pipe2_res[0...s].sum(&:to_i) enqueued = pipe2_res[s..].sum(&:to_i) @stats[:workers_size] = workers_size @stats[:enqueued] = enqueued @stats end |
#processed ⇒ Object
32 33 34 |
# File 'lib/sidekiq/api.rb', line 32 def processed stat :processed end |
#processes_size ⇒ Object
56 57 58 |
# File 'lib/sidekiq/api.rb', line 56 def processes_size stat :processes_size end |
#queues ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/sidekiq/api.rb', line 68 def queues Sidekiq.redis do |conn| queues = conn.sscan("queues").to_a lengths = conn.pipelined { |pipeline| queues.each do |queue| pipeline.llen("queue:#{queue}") end } array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size } array_of_arrays.to_h end end |
#reset(*stats) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/sidekiq/api.rb', line 157 def reset(*stats) all = %w[failed processed] stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s) mset_args = [] stats.each do |stat| mset_args << "stat:#{stat}" mset_args << 0 end Sidekiq.redis do |conn| conn.mset(*mset_args) end end |
#retry_size ⇒ Object
44 45 46 |
# File 'lib/sidekiq/api.rb', line 44 def retry_size stat :retry_size end |
#scheduled_size ⇒ Object
40 41 42 |
# File 'lib/sidekiq/api.rb', line 40 def scheduled_size stat :scheduled_size end |
#workers_size ⇒ Object
60 61 62 |
# File 'lib/sidekiq/api.rb', line 60 def workers_size stat :workers_size end |