Class: Postburner::Job

Inherits:
ApplicationRecord show all
Includes:
Backburner::Queue
Defined in:
app/models/postburner/job.rb

Overview

Must implement a perform method, if an exception is raised the job doesn’t complete.

Job won’t run unless queued_at is set, and is set to a time prior to the time the job runs.

TODO Mailer uses ActiveJob::Arguments… probably should use that here as well. Decided how to migrate existing jobs or allow both - Opt to allow both: rescue from ActiveJob::DeserializationError and use the plain hash, probably log it too.

Add ‘cancelled_at` that blocks jobs performing if present.

Direct Known Subclasses

Mailer

Defined Under Namespace

Classes: AlreadyProcessed

Constant Summary collapse

LOG_LEVELS =
[
  :debug,
  :info,
  :warning,
  :error
].freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.perform(id, _ = {}) ⇒ Object

tube: backburner.worker.queue.backburner-jobs



74
75
76
77
78
79
80
81
82
83
84
85
# File 'app/models/postburner/job.rb', line 74

def self.perform(id, _={})
  job = nil
  begin
    job = self.find(id)
  rescue ActiveRecord::RecordNotFound => e
    Rails.logger.warn <<-MSG
[Postburner::Job] [#{id}] Not Found.
    MSG
  end
  #job&.perform!(job.args)
  job.perform!(job.args)
end

Instance Method Details

#beanstalk_jobObject



189
190
191
192
193
194
195
196
# File 'app/models/postburner/job.rb', line 189

def beanstalk_job
  return unless self.bkid.present?
  return @_beanstalk_job if @_beanstalk_job

  @_beanstalk_job = Postburner.connection.beanstalk.jobs.find(self.bkid)

  @_beanstalk_job
end

#beanstalk_job!Object



198
199
200
201
# File 'app/models/postburner/job.rb', line 198

def beanstalk_job!
  @_beanstalk_job = nil
  self.beanstalk_job
end

#delete!Object



165
166
167
168
169
170
171
172
# File 'app/models/postburner/job.rb', line 165

def delete!
  return unless self.beanstalk_job
  begin
    self.beanstalk_job.delete
  rescue Beaneater::NotConnected => e
    self.beanstalk_job!.delete
  end
end

#elapsed_msObject

ms from attempting_at



237
238
239
240
# File 'app/models/postburner/job.rb', line 237

def elapsed_ms
  return unless self.attempting_at
  (Time.zone.now - self.attempting_at) * 1000
end

#intended_atObject



247
248
249
# File 'app/models/postburner/job.rb', line 247

def intended_at
  self.run_at ? self.run_at : self.queued_at
end

#kick!Object



174
175
176
177
178
179
180
181
# File 'app/models/postburner/job.rb', line 174

def kick!
  return unless self.beanstalk_job
  begin
    self.beanstalk_job.kick
  rescue Beaneater::NotConnected => e
    self.beanstalk_job!.kick
  end
end

#log(message, options = {}) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'app/models/postburner/job.rb', line 220

def log(message, options={})
  options[:level] ||= :info
  options[:level] = :error unless LOG_LEVELS.member?(options[:level])

  self.logs << [
    Time.zone.now, # time
    {
      bkid:     self.bkid,
      level:    options[:level], # level
      message:  message, # message
      elapsed:  self.elapsed_ms, # ms from start
    }
  ]
end

#log!(message, options = {}) ⇒ Object



242
243
244
245
# File 'app/models/postburner/job.rb', line 242

def log!(message, options={})
  self.log(message, options)
  self.update_column :logs, self.logs
end

#log_exception(exception) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
# File 'app/models/postburner/job.rb', line 203

def log_exception(exception)
  self.errata << [
    Time.zone.now,
    {
      bkid:       self.bkid,
      class:      exception.class,
      message:    exception.message,
      backtrace:  exception.backtrace,
    }
  ]
end

#log_exception!(exception) ⇒ Object



215
216
217
218
# File 'app/models/postburner/job.rb', line 215

def log_exception!(exception)
  self.log_exception(exception)
  self.update_column :errata, self.errata
end

#perform!(args = {}) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'app/models/postburner/job.rb', line 87

def perform!(args={})
  _run_attempt_callbacks do
    self.attempting

    self.update_columns(
      attempting_at:  self.attempting_at,
      attempts:       self.attempts,
      attempt_count:  self.attempts.length,
      lag:            self.lag,
      processing_at:  Time.zone.now,
    )

    begin
      if self.queued_at.nil?
        self.log! "Not Queued", level: :error
        return
      end

      if self.queued_at > Time.zone.now
        self.log! "Future Queued", level: :error
        return
      end

      if self.processed_at.present?
        self.log! "Already Processed", level: :error
        self.delete!
        return
      end

      if self.removed_at.present?
        self.log! "Removed", level: :error
        return
      end

      if self.run_at && self.run_at > Time.zone.now
        response = self.insert! delay: self.run_at - Time.zone.now
        self.log! "PREMATURE; RE-INSERTED: #{response}"
        return
      end

      self.log!("START (bkid #{self.bkid})")

      _run_processing_callbacks do
        begin
          self.perform(args)
        rescue Exception => exception
          self.persist_metadata!
          self.log! '[Postburner] Exception raised during perform prevented completion.'
          raise exception
        end
      end

      self.log!("DONE (bkid #{self.bkid})")

      begin
        now = Time.zone.now
        _duration =  (now - self.processing_at) * 1000 rescue nil

        _run_processed_callbacks do
          persist_metadata!(
            processed_at: now,
            duration:     _duration,
          )
        end
      rescue Exception => e
        self.log_exception!(e)
        self.log! '[Postburner] Could not set data after processing.'
        # TODO README doesn't retry if Postburner is to blame
      end

    rescue Exception => exception
      self.log_exception!(exception)
      raise exception
    end
  end # run_callbacks :attempt

end

#queue!(options = {}) ⇒ Object

Raises:

  • (ActiveRecord::RecordInvalid)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'app/models/postburner/job.rb', line 38

def queue!(options={})
  return if self.queued_at.present? && self.bkid.present?
  raise ActiveRecord::RecordInvalid, "Can't queue unless valid." unless self.valid?
  raise AlreadyProcessed, "Processed at #{self.processed_at}" if self.processed_at

  at = options.delete(:at)
  now = Time.zone.now

  self.queued_at = now
  self.run_at = case
                when at.present?
                  # this is rudimentary, add error handling
                  options[:delay] ||= at.to_i - now.to_i
                  at
                when options[:delay].present?
                  now + options[:delay].seconds
                end

  @_insert_options = options

  self.save!
end

#remove!Object



183
184
185
186
187
# File 'app/models/postburner/job.rb', line 183

def remove!
  return if self.removed_at
  self.delete!
  self.update_column(:removed_at, Time.zone.now)
end

#requeue!(options = {}) ⇒ Object



61
62
63
64
65
66
# File 'app/models/postburner/job.rb', line 61

def requeue!(options={})
  self.delete!
  self.bkid, self.queued_at = nil, nil

  self.queue! options
end

#will_insert?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'app/models/postburner/job.rb', line 68

def will_insert?
  @_insert_options.is_a? Hash
end