Class: PrometheusExporter::Instrumentation::DelayedJob

Inherits:
Object
  • Object
show all
Defined in:
lib/prometheus_exporter/instrumentation/delayed_job.rb

Constant Summary collapse

JOB_CLASS_REGEXP =
%r{job_class: ((\w+:{0,2})+)}.freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client: nil) ⇒ DelayedJob

Returns a new instance of DelayedJob.



28
29
30
# File 'lib/prometheus_exporter/instrumentation/delayed_job.rb', line 28

def initialize(client: nil)
  @client = client || PrometheusExporter::Client.default
end

Class Method Details

.register_plugin(client: nil, include_module_name: false) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/prometheus_exporter/instrumentation/delayed_job.rb', line 8

def register_plugin(client: nil, include_module_name: false)
  instrumenter = self.new(client: client)
  return unless defined?(Delayed::Plugin)

  plugin = Class.new(Delayed::Plugin) do
    callbacks do |lifecycle|
      lifecycle.around(:invoke_job) do |job, *args, &block|
        max_attempts = Delayed::Worker.max_attempts
        enqueued_count = Delayed::Job.where(queue: job.queue).count
        pending_count = Delayed::Job.where(attempts: 0, locked_at: nil, queue: job.queue).count
        instrumenter.call(job, max_attempts, enqueued_count, pending_count, include_module_name,
                          *args, &block)
      end
    end
  end

  Delayed::Worker.plugins << plugin
end

Instance Method Details

#call(job, max_attempts, enqueued_count, pending_count, include_module_name, *args, &block) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/prometheus_exporter/instrumentation/delayed_job.rb', line 32

def call(job, max_attempts, enqueued_count, pending_count, include_module_name, *args, &block)
  success = false
  start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  latency = Time.current - job.run_at
  attempts = job.attempts + 1 # Increment because we're adding the current attempt
  result = block.call(job, *args)
  success = true
  result
ensure
  duration = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - start

  @client.send_json(
    type: "delayed_job",
    name: job.handler.to_s.match(JOB_CLASS_REGEXP).to_a[include_module_name ? 1 : 2].to_s,
    queue_name: job.queue,
    success: success,
    duration: duration,
    latency: latency,
    attempts: attempts,
    max_attempts: max_attempts,
    enqueued: enqueued_count,
    pending: pending_count
  )
end