Class: Jobs::Base

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
app/jobs/base.rb

Direct Known Subclasses

AdminConfirmationEmail, AnonymizeUser, AutomaticGroupMembership, BackfillSidebarSiteSettings, BackupChunksMerger, BulkGrantTrustLevel, BulkInvite, BulkUserTitleUpdate, ChangeDisplayName, ConfirmSnsSubscription, CrawlTopicLink, CreateAvatarThumbnails, CreateBackup, CreateLinkedTopic, CreateUserReviewable, DeleteInaccessibleNotifications, DownloadAvatarFromUrl, DownloadBackupEmail, DownloadProfileBackgroundFromUrl, EmitWebHookEvent, EnableBootstrapMode, ExportCsvFile, ExportUserArchive, FeatureTopicUsers, GenerateTopicThumbnails, GroupPmAlert, GroupPmUpdateSummary, GroupSmtpEmail, IndexCategoryForSearch, IndexUserFieldsForSearch, InviteEmail, InvitePasswordInstructionsEmail, MakeEmbeddedTopicVisible, MassAwardBadge, MergeUser, NotifyCategoryChange, NotifyMailingListSubscribers, NotifyMovedPosts, NotifyPostRevision, NotifyReviewable, NotifyTagChange, Onceoff, PostAlert, PostUpdateTopicTrackingState, ProcessBulkInviteEmails, ProcessEmail, ProcessPost, ProcessSnsNotification, PublishGroupMembershipUpdates, PullHotlinkedImages, PushNotification, RebakeCustomEmojiPosts, RebakePostsForUpload, RefreshUsersReviewableCounts, RemoveBanner, RetrieveTopic, RunHeartbeat, RunProblemCheck, Scheduled, SendPushNotification, SendSystemMessage, SuspiciousLogin, SyncAclsForUploads, SyncTopicUserBookmarked, ToggleTopicClosed, TopicActionConverter, TopicTimerBase, TruncateUserFlagStats, UnpinTopic, UpdateGravatar, UpdateGroupMentions, UpdateHotlinkedRaw, UpdatePostUploadsSecureStatus, UpdateTopRedirection, UpdateTopicUploadSecurity, UpdateUsername, UserEmail

Defined Under Namespace

Classes: JobInstrumenter

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.acquire_cluster_concurrency_lock!Object



254
255
256
# File 'app/jobs/base.rb', line 254

def self.acquire_cluster_concurrency_lock!
  !!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end

.clear_cluster_concurrency_lock!Object



250
251
252
# File 'app/jobs/base.rb', line 250

def self.clear_cluster_concurrency_lock!
  Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end

.cluster_concurrency(val) ⇒ Object

Raises:

  • (ArgumentError)


182
183
184
185
# File 'app/jobs/base.rb', line 182

def self.cluster_concurrency(val)
  raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
  @cluster_concurrency = val
end

.cluster_concurrency_redis_keyObject



246
247
248
# File 'app/jobs/base.rb', line 246

def self.cluster_concurrency_redis_key
  "cluster_concurrency:#{self}"
end

.delayed_perform(opts = {}) ⇒ Object



215
216
217
# File 'app/jobs/base.rb', line 215

def self.delayed_perform(opts = {})
  self.new.perform(opts)
end

.get_cluster_concurrencyObject



187
188
189
# File 'app/jobs/base.rb', line 187

def self.get_cluster_concurrency
  @cluster_concurrency
end

Instance Method Details

#error_context(opts, code_desc = nil, extra = {}) ⇒ Object

Construct an error context object for Discourse.handle_exception Subclasses are encouraged to use this!

‘opts` is the arguments passed to execute(). `code_desc` is a short string describing what the code was doing (optional). `extra` is for any other context you logged. Note that, when building your `extra`, that :opts, :job, and :code are used by this method, and :current_db and :current_hostname are used by handle_exception.



206
207
208
209
210
211
212
213
# File 'app/jobs/base.rb', line 206

def error_context(opts, code_desc = nil, extra = {})
  ctx = {}
  ctx[:opts] = opts
  ctx[:job] = self.class
  ctx[:message] = code_desc if code_desc
  ctx.merge!(extra) if extra != nil
  ctx
end

#execute(opts = {}) ⇒ Object



219
220
221
# File 'app/jobs/base.rb', line 219

def execute(opts = {})
  raise "Overwrite me!"
end

#last_db_durationObject



223
224
225
# File 'app/jobs/base.rb', line 223

def last_db_duration
  @db_duration || 0
end

#log(*args) ⇒ Object



191
192
193
194
195
196
# File 'app/jobs/base.rb', line 191

def log(*args)
  args.each do |arg|
    Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
  end
  true
end

#perform(*args) ⇒ Object



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'app/jobs/base.rb', line 258

def perform(*args)
  requeued = false
  keepalive_thread = nil
  finished = false

  if self.class.get_cluster_concurrency
    if !self.class.acquire_cluster_concurrency_lock!
      self.class.perform_in(10.seconds, *args)
      requeued = true
      return
    end

    parent_thread = Thread.current
    cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key

    keepalive_thread =
      Thread.new do
        while parent_thread.alive? && !finished
          Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)

          # Sleep for 60 seconds, but wake up every second to check if the job has been completed
          60.times do
            break if finished
            sleep 1
          end
        end
      end
  end

  opts = args.extract_options!.with_indifferent_access

  Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?

  dbs =
    if opts[:current_site_id]
      [opts[:current_site_id]]
    else
      RailsMultisite::ConnectionManagement.all_dbs
    end

  exceptions = []
  dbs.each do |db|
    begin
      exception = {}

      RailsMultisite::ConnectionManagement.with_connection(db) do
        job_instrumenter =
          JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
        begin
          I18n.locale =
            SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
          I18n.ensure_all_loaded!
          begin
            logster_env = {}
            Logster.add_to_env(logster_env, :job, self.class.to_s)
            Logster.add_to_env(logster_env, :db, db)
            Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env

            execute(opts)
          rescue => e
            exception[:ex] = e
            exception[:other] = { problem_db: db }
          end
        rescue => e
          exception[:ex] = e
          exception[:message] = "While establishing database connection to #{db}"
          exception[:other] = { problem_db: db }
        ensure
          job_instrumenter.stop(exception: exception)
        end
      end

      exceptions << exception unless exception.empty?
    end
  end

  Thread.current[Logster::Logger::LOGSTER_ENV] = nil

  if exceptions.length > 0
    exceptions.each do |exception_hash|
      Discourse.handle_job_exception(
        exception_hash[:ex],
        error_context(opts, exception_hash[:code], exception_hash[:other]),
      )
    end
    raise HandledExceptionWrapper.new(exceptions[0][:ex])
  end

  nil
ensure
  if self.class.get_cluster_concurrency && !requeued
    finished = true
    keepalive_thread.wakeup
    keepalive_thread.join
    self.class.clear_cluster_concurrency_lock!
  end

  ActiveRecord::Base.connection_handler.clear_active_connections!
end

#perform_immediately(*args) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'app/jobs/base.rb', line 227

def perform_immediately(*args)
  opts = args.extract_options!.with_indifferent_access

  if opts.has_key?(:current_site_id) &&
       opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
    raise ArgumentError.new(
            "You can't connect to another database when executing a job synchronously.",
          )
  else
    begin
      retval = execute(opts)
    rescue => exc
      Discourse.handle_job_exception(exc, error_context(opts))
    end

    retval
  end
end