Class: Jobs::Base::JobInstrumenter

Inherits:
Object
  • Object
show all
Defined in:
app/jobs/base.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_class:, opts:, db:, jid:) ⇒ JobInstrumenter

Returns a new instance of JobInstrumenter.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'app/jobs/base.rb', line 37

def initialize(job_class:, opts:, db:, jid:)
  return unless enabled?
  self.class.mutex.synchronize do
    @data = {}

    @data["hostname"] = Discourse.os_hostname
    @data["pid"] = Process.pid # Pid
    @data["database"] = db # DB name - multisite db name it ran on
    @data["job_id"] = jid # Job unique ID
    @data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats
    @data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular
    @data["opts"] = opts.to_json # Params - json encoded params for the job

    if ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
      @data["status"] = "starting"
      write_to_log
    end

    @data["status"] = "pending"
    @start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC)

    self.class.ensure_interval_logging!
    @@active_jobs ||= []
    @@active_jobs << self

    MethodProfiler.ensure_discourse_instrumentation!
    MethodProfiler.start
  end
end

Class Method Details

.ensure_interval_logging!Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'app/jobs/base.rb', line 139

def self.ensure_interval_logging!
  interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"]
  return if !interval
  interval = interval.to_i
  @@interval_thread ||=
    Thread.new do
      begin
        loop do
          sleep interval
          mutex.synchronize do
            @@active_jobs.each { |j| j.write_to_log if j.current_duration > interval }
          end
        end
      rescue Exception => e
        Discourse.warn_exception(
          e,
          message: "Sidekiq interval logging thread terminated unexpectedly",
        )
      end
    end
end

.mutexObject



135
136
137
# File 'app/jobs/base.rb', line 135

def self.mutex
  @@mutex ||= Mutex.new
end

.raw_log(message) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'app/jobs/base.rb', line 93

def self.raw_log(message)
  @@logger ||=
    begin
      f = File.open "#{Rails.root}/log/sidekiq.log", "a"
      f.sync = true
      Logger.new f
    end

  @@log_queue ||= Queue.new

  if !defined?(@@log_thread) || !@@log_thread.alive?
    @@log_thread =
      Thread.new do
        loop do
          @@logger << @@log_queue.pop
        rescue Exception => e
          Discourse.warn_exception(
            e,
            message: "Exception encountered while logging Sidekiq job",
          )
        end
      end
  end

  @@log_queue.push(message)
end

Instance Method Details

#current_durationObject



120
121
122
# File 'app/jobs/base.rb', line 120

def current_duration
  Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timestamp
end

#enabled?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'app/jobs/base.rb', line 131

def enabled?
  ENV["DISCOURSE_LOG_SIDEKIQ"] == "1"
end

#stop(exception:) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'app/jobs/base.rb', line 67

def stop(exception:)
  return unless enabled?
  self.class.mutex.synchronize do
    profile = MethodProfiler.stop

    @@active_jobs.delete(self)

    @data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run
    @data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s)
    @data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran
    @data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s)
    @data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands
    @data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s)
    @data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands

    if exception.present?
      @data["exception"] = exception # Exception - if job fails a json encoded exception
      @data["status"] = "failed"
    else
      @data["status"] = "success" # Status - fail, success, pending
    end

    write_to_log
  end
end

#write_to_logObject



124
125
126
127
128
129
# File 'app/jobs/base.rb', line 124

def write_to_log
  return unless enabled?
  @data["@timestamp"] = Time.now
  @data["duration"] = current_duration if @data["status"] == "pending"
  self.class.raw_log("#{@data.to_json}\n")
end