Class: Jobs::Base

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
app/jobs/base.rb

Direct Known Subclasses

AdminConfirmationEmail, AnonymizeUser, AutomaticGroupMembership, BackfillSidebarSiteSettings, BackupChunksMerger, BulkGrantTrustLevel, BulkInvite, BulkUserTitleUpdate, ChangeDisplayName, ConfirmSnsSubscription, CrawlTopicLink, CreateAvatarThumbnails, CreateBackup, CreateLinkedTopic, CreateUserReviewable, DeleteInaccessibleNotifications, DownloadAvatarFromUrl, DownloadBackupEmail, DownloadProfileBackgroundFromUrl, EmitWebHookEvent, EnableBootstrapMode, ExportCsvFile, ExportUserArchive, FeatureTopicUsers, GenerateTopicThumbnails, GroupPmAlert, GroupPmUpdateSummary, GroupSmtpEmail, IndexCategoryForSearch, InviteEmail, InvitePasswordInstructionsEmail, MakeEmbeddedTopicVisible, MassAwardBadge, MergeUser, NotifyCategoryChange, NotifyMailingListSubscribers, NotifyMovedPosts, NotifyPostRevision, NotifyReviewable, NotifyTagChange, Onceoff, PostAlert, PostUpdateTopicTrackingState, ProcessBulkInviteEmails, ProcessEmail, ProcessPost, ProcessSnsNotification, PublishGroupMembershipUpdates, PullHotlinkedImages, PushNotification, RebakeCustomEmojiPosts, RebakePostsForUpload, RefreshUsersReviewableCounts, RemoveBanner, RetrieveTopic, RunHeartbeat, Scheduled, SendPushNotification, SendSystemMessage, StreamTopicSummary, SuspiciousLogin, SyncAclsForUploads, SyncTopicUserBookmarked, ToggleTopicClosed, TopicActionConverter, TopicTimerBase, TruncateUserFlagStats, UnpinTopic, UpdateGravatar, UpdateGroupMentions, UpdateHotlinkedRaw, UpdatePostUploadsSecureStatus, UpdateS3Inventory, UpdateTopRedirection, UpdateTopicUploadSecurity, UpdateUsername, UserEmail

Defined Under Namespace

Classes: JobInstrumenter

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.acquire_cluster_concurrency_lock!Object



236
237
238
# File 'app/jobs/base.rb', line 236

def self.acquire_cluster_concurrency_lock!
  !!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end

.clear_cluster_concurrency_lock!Object



232
233
234
# File 'app/jobs/base.rb', line 232

def self.clear_cluster_concurrency_lock!
  Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end

.cluster_concurrency(val) ⇒ Object

Raises:

  • (ArgumentError)


164
165
166
167
# File 'app/jobs/base.rb', line 164

def self.cluster_concurrency(val)
  raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
  @cluster_concurrency = val
end

.cluster_concurrency_redis_keyObject



228
229
230
# File 'app/jobs/base.rb', line 228

def self.cluster_concurrency_redis_key
  "cluster_concurrency:#{self}"
end

.delayed_perform(opts = {}) ⇒ Object



197
198
199
# File 'app/jobs/base.rb', line 197

def self.delayed_perform(opts = {})
  self.new.perform(opts)
end

.get_cluster_concurrencyObject



169
170
171
# File 'app/jobs/base.rb', line 169

def self.get_cluster_concurrency
  @cluster_concurrency
end

Instance Method Details

#error_context(opts, code_desc = nil, extra = {}) ⇒ Object

Construct an error context object for Discourse.handle_exception Subclasses are encouraged to use this!

‘opts` is the arguments passed to execute(). `code_desc` is a short string describing what the code was doing (optional). `extra` is for any other context you logged. Note that, when building your `extra`, that :opts, :job, and :code are used by this method, and :current_db and :current_hostname are used by handle_exception.



188
189
190
191
192
193
194
195
# File 'app/jobs/base.rb', line 188

def error_context(opts, code_desc = nil, extra = {})
  ctx = {}
  ctx[:opts] = opts
  ctx[:job] = self.class
  ctx[:message] = code_desc if code_desc
  ctx.merge!(extra) if extra != nil
  ctx
end

#execute(opts = {}) ⇒ Object



201
202
203
# File 'app/jobs/base.rb', line 201

def execute(opts = {})
  raise "Overwrite me!"
end

#last_db_durationObject



205
206
207
# File 'app/jobs/base.rb', line 205

def last_db_duration
  @db_duration || 0
end

#log(*args) ⇒ Object



173
174
175
176
177
178
# File 'app/jobs/base.rb', line 173

def log(*args)
  args.each do |arg|
    Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
  end
  true
end

#perform(*args) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'app/jobs/base.rb', line 240

def perform(*args)
  requeued = false
  keepalive_thread = nil
  finished = false

  if self.class.get_cluster_concurrency
    if !self.class.acquire_cluster_concurrency_lock!
      self.class.perform_in(10.seconds, *args)
      requeued = true
      return
    end
    parent_thread = Thread.current
    cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key

    keepalive_thread =
      Thread.new do
        while parent_thread.alive? && !finished
          Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
          sleep 60
        end
      end
  end

  opts = args.extract_options!.with_indifferent_access

  Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?

  dbs =
    if opts[:current_site_id]
      [opts[:current_site_id]]
    else
      RailsMultisite::ConnectionManagement.all_dbs
    end

  exceptions = []
  dbs.each do |db|
    begin
      exception = {}

      RailsMultisite::ConnectionManagement.with_connection(db) do
        job_instrumenter =
          JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
        begin
          I18n.locale =
            SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
          I18n.ensure_all_loaded!
          begin
            logster_env = {}
            Logster.add_to_env(logster_env, :job, self.class.to_s)
            Logster.add_to_env(logster_env, :db, db)
            Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env

            execute(opts)
          rescue => e
            exception[:ex] = e
            exception[:other] = { problem_db: db }
          end
        rescue => e
          exception[:ex] = e
          exception[:message] = "While establishing database connection to #{db}"
          exception[:other] = { problem_db: db }
        ensure
          job_instrumenter.stop(exception: exception)
        end
      end

      exceptions << exception unless exception.empty?
    end
  end

  Thread.current[Logster::Logger::LOGSTER_ENV] = nil

  if exceptions.length > 0
    exceptions.each do |exception_hash|
      Discourse.handle_job_exception(
        exception_hash[:ex],
        error_context(opts, exception_hash[:code], exception_hash[:other]),
      )
    end
    raise HandledExceptionWrapper.new(exceptions[0][:ex])
  end

  nil
ensure
  if self.class.get_cluster_concurrency && !requeued
    finished = true
    keepalive_thread.wakeup
    keepalive_thread.join
    self.class.clear_cluster_concurrency_lock!
  end
  ActiveRecord::Base.connection_handler.clear_active_connections!
end

#perform_immediately(*args) ⇒ Object



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'app/jobs/base.rb', line 209

def perform_immediately(*args)
  opts = args.extract_options!.with_indifferent_access

  if opts.has_key?(:current_site_id) &&
       opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
    raise ArgumentError.new(
            "You can't connect to another database when executing a job synchronously.",
          )
  else
    begin
      retval = execute(opts)
    rescue => exc
      Discourse.handle_job_exception(exc, error_context(opts))
    end

    retval
  end
end