Module: Yabeda::ActiveJob

Defined in:
lib/yabeda/activejob.rb,
lib/yabeda/activejob/version.rb

Overview

Small set of metrics on activejob jobs

Constant Summary collapse

LONG_RUNNING_JOB_RUNTIME_BUCKETS =
[
  0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, # standard (from Prometheus)
  30, 60, 120, 300, 1800, 3600, 21_600, # In cases jobs are very long-running
].freeze
VERSION =
"0.6.0"

Class Method Summary collapse

Class Method Details

.install!Object

rubocop: disable Metrics/MethodLength, Metrics/BlockLength, Metrics/AbcSize



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/yabeda/activejob.rb', line 18

def self.install!
  Yabeda.configure do
    group :activejob

    counter :executed_total, tags: %i[queue activejob executions],
                             comment: "A counter of the total number of activejobs executed."
    counter :enqueued_total, tags: %i[queue activejob executions],
                             comment: "A counter of the total number of activejobs enqueued."
    counter :success_total, tags: %i[queue activejob executions],
                            comment: "A counter of the total number of activejobs successfully processed."
    counter :failed_total, tags: %i[queue activejob executions failure_reason],
                           comment: "A counter of the total number of jobs failed for an activejob."

    histogram :runtime, comment: "A histogram of the activejob execution time.",
                        unit: :seconds, per: :activejob,
                        tags: %i[queue activejob executions],
                        buckets: LONG_RUNNING_JOB_RUNTIME_BUCKETS

    histogram :latency, comment: "The job latency, the difference in seconds between enqueued and running time",
                        unit: :seconds, per: :activejob,
                        tags: %i[queue activejob executions],
                        buckets: LONG_RUNNING_JOB_RUNTIME_BUCKETS

    # job complete event
    ActiveSupport::Notifications.subscribe "perform.active_job" do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)
      labels = {
        activejob: event.payload[:job].class.to_s,
        queue: event.payload[:job].queue_name.to_s,
        executions: event.payload[:job].executions.to_s,
      }
      if event.payload[:exception].present?
        activejob_failed_total.increment(
          labels.merge(failure_reason: event.payload[:exception].first.to_s),
        )
      else
        activejob_success_total.increment(labels)
      end

      activejob_executed_total.increment(labels)
      activejob_runtime.measure(labels, Yabeda::ActiveJob.ms2s(event.duration))
      Yabeda::ActiveJob.after_event_block.call(event) if Yabeda::ActiveJob.after_event_block.respond_to?(:call)
    end

    # start job event
    ActiveSupport::Notifications.subscribe "perform_start.active_job" do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)

      labels = {
        activejob: event.payload[:job].class.to_s,
        queue: event.payload[:job].queue_name,
        executions: event.payload[:job].executions.to_s,
      }

      labels.merge!(event.payload.slice(*Yabeda.default_tags.keys - labels.keys))
      job_latency = Yabeda::ActiveJob.job_latency(event)
      activejob_latency.measure(labels, job_latency) if job_latency.present?
      Yabeda::ActiveJob.after_event_block.call(event) if Yabeda::ActiveJob.after_event_block.respond_to?(:call)
    end

    ActiveSupport::Notifications.subscribe "enqueue.active_job" do |*args|
      event = ActiveSupport::Notifications::Event.new(*args)

      labels = {
        activejob: event.payload[:job].class.to_s,
        queue: event.payload[:job].queue_name,
        executions: event.payload[:job].executions.to_s,
      }

      labels.merge!(event.payload.slice(*Yabeda.default_tags.keys - labels.keys))
      activejob_enqueued_total.increment(labels)
      Yabeda::ActiveJob.after_event_block.call(event) if Yabeda::ActiveJob.after_event_block.respond_to?(:call)
    end
  end
end

.job_latency(event) ⇒ Object

rubocop: enable Metrics/MethodLength, Metrics/BlockLength, Metrics/AbcSize



95
96
97
98
99
100
101
102
103
# File 'lib/yabeda/activejob.rb', line 95

def self.job_latency(event)
  enqueue_time = event.payload[:job].enqueued_at
  return nil unless enqueue_time.present?

  enqueue_time = parse_event_time(enqueue_time)
  perform_at_time = parse_event_time(event.end)

  perform_at_time - enqueue_time
end

.ms2s(milliseconds) ⇒ Object



105
106
107
# File 'lib/yabeda/activejob.rb', line 105

def self.ms2s(milliseconds)
  (milliseconds.to_f / 1000).round(3)
end

.parse_event_time(time) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/yabeda/activejob.rb', line 109

def self.parse_event_time(time)
  case time
  when Time   then time
  when String then Time.parse(time).utc
  else
    if time > 1e12
      Time.at(ms2s(time)).utc
    else
      Time.at(time).utc
    end
  end
end