Class: Sidekiq::Metrics::ExecutionTracker

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/sidekiq/metrics/tracking.rb

Constant Summary collapse

SHORT_TERM =

LONG_TERM = 90 * 24 * 60 * 60 MID_TERM = 7 * 24 * 60 * 60

8 * 60 * 60

Instance Attribute Summary

Attributes included from Component

#config

Instance Method Summary collapse

Methods included from Component

#fire_event, #handle_exception, #hostname, #identity, #inspect, #logger, #process_nonce, #redis, #safe_thread, #tid, #watchdog

Constructor Details

#initialize(config) ⇒ ExecutionTracker

Returns a new instance of ExecutionTracker.



13
14
15
16
17
18
19
# File 'lib/sidekiq/metrics/tracking.rb', line 13

def initialize(config)
  @config = config
  @jobs = Hash.new(0)
  @totals = Hash.new(0)
  @grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) }
  @lock = Mutex.new
end

Instance Method Details

#flush(time = Time.now) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/sidekiq/metrics/tracking.rb', line 57

def flush(time = Time.now)
  totals, jobs, grams = reset
  procd = totals["p"]
  fails = totals["f"]
  return if procd == 0 && fails == 0

  now = time.utc
  # nowdate = now.strftime("%Y%m%d")
  # nowhour = now.strftime("%Y%m%d|%-H")
  nowmin = now.strftime("%Y%m%d|%-H:%-M")
  count = 0

  redis do |conn|
    # persist fine-grained histogram data
    if grams.size > 0
      conn.pipelined do |pipe|
        grams.each do |_, gram|
          gram.persist(pipe, now)
        end
      end
    end

    # persist coarse grained execution count + execution millis.
    # note as of today we don't use or do anything with the
    # daily or hourly rollups.
    [
      # ["j", jobs, nowdate, LONG_TERM],
      # ["j", jobs, nowhour, MID_TERM],
      ["j", jobs, nowmin, SHORT_TERM]
    ].each do |prefix, data, bucket, ttl|
      conn.pipelined do |xa|
        stats = "#{prefix}|#{bucket}"
        data.each_pair do |key, value|
          xa.hincrby stats, key, value
          count += 1
        end
        xa.expire(stats, ttl)
      end
    end
    logger.debug "Flushed #{count} metrics"
    count
  end
end

#track(queue, klass) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/sidekiq/metrics/tracking.rb', line 21

def track(queue, klass)
  start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond)
  time_ms = 0
  begin
    begin
      yield
    ensure
      finish = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond)
      time_ms = finish - start
    end
    # We don't track time for failed jobs as they can have very unpredictable
    # execution times. more important to know average time for successful jobs so we
    # can better recognize when a perf regression is introduced.
    track_time(klass, time_ms)
  rescue JobRetry::Skip
    # This is raised when iterable job is interrupted.
    track_time(klass, time_ms)
    raise
  rescue Exception
    @lock.synchronize {
      @jobs["#{klass}|f"] += 1
      @totals["f"] += 1
    }
    raise
  ensure
    @lock.synchronize {
      @jobs["#{klass}|p"] += 1
      @totals["p"] += 1
    }
  end
end