Class: Jobs::Base
- Inherits:
-
Object
show all
- Includes:
- Sidekiq::Worker
- Defined in:
- app/jobs/base.rb
Direct Known Subclasses
AdminConfirmationEmail, AnonymizeUser, AutomaticGroupMembership, BackfillSidebarSiteSettings, BackupChunksMerger, BulkGrantTrustLevel, BulkInvite, BulkUserTitleUpdate, ChangeDisplayName, ConfirmSnsSubscription, CrawlTopicLink, CreateAvatarThumbnails, CreateBackup, CreateLinkedTopic, CreateUserReviewable, DeleteInaccessibleNotifications, DownloadAvatarFromUrl, DownloadBackupEmail, DownloadProfileBackgroundFromUrl, EmitWebHookEvent, EnableBootstrapMode, ExportCsvFile, ExportUserArchive, FeatureTopicUsers, GenerateTopicThumbnails, GroupPmAlert, GroupPmUpdateSummary, GroupSmtpEmail, IndexCategoryForSearch, IndexUserFieldsForSearch, InviteEmail, InvitePasswordInstructionsEmail, MakeEmbeddedTopicVisible, MassAwardBadge, MergeUser, NotifyCategoryChange, NotifyMailingListSubscribers, NotifyMovedPosts, NotifyPostRevision, NotifyReviewable, NotifyTagChange, Onceoff, PostAlert, PostUpdateTopicTrackingState, ProcessBulkInviteEmails, ProcessEmail, ProcessPost, ProcessSnsNotification, PublishGroupMembershipUpdates, PullHotlinkedImages, PushNotification, RebakeCustomEmojiPosts, RebakePostsForUpload, RefreshUsersReviewableCounts, RemoveBanner, RetrieveTopic, RunHeartbeat, RunProblemCheck, Scheduled, SendPushNotification, SendSystemMessage, SuspiciousLogin, SyncAclsForUploads, SyncTopicUserBookmarked, ToggleTopicClosed, TopicActionConverter, TopicTimerBase, TruncateUserFlagStats, UnpinTopic, UpdateGravatar, UpdateGroupMentions, UpdateHotlinkedRaw, UpdatePostUploadsSecureStatus, UpdateTopRedirection, UpdateTopicUploadSecurity, UpdateUsername, UserEmail
Defined Under Namespace
Classes: JobInstrumenter
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.acquire_cluster_concurrency_lock! ⇒ Object
254
255
256
|
# File 'app/jobs/base.rb', line 254
def self.acquire_cluster_concurrency_lock!
!!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end
|
.clear_cluster_concurrency_lock! ⇒ Object
250
251
252
|
# File 'app/jobs/base.rb', line 250
def self.clear_cluster_concurrency_lock!
Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end
|
.cluster_concurrency(val) ⇒ Object
182
183
184
185
|
# File 'app/jobs/base.rb', line 182
def self.cluster_concurrency(val)
raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
@cluster_concurrency = val
end
|
.cluster_concurrency_redis_key ⇒ Object
246
247
248
|
# File 'app/jobs/base.rb', line 246
def self.cluster_concurrency_redis_key
"cluster_concurrency:#{self}"
end
|
215
216
217
|
# File 'app/jobs/base.rb', line 215
def self.delayed_perform(opts = {})
self.new.perform(opts)
end
|
.get_cluster_concurrency ⇒ Object
187
188
189
|
# File 'app/jobs/base.rb', line 187
def self.get_cluster_concurrency
@cluster_concurrency
end
|
Instance Method Details
#error_context(opts, code_desc = nil, extra = {}) ⇒ Object
Construct an error context object for Discourse.handle_exception Subclasses are encouraged to use this!
‘opts` is the arguments passed to execute(). `code_desc` is a short string describing what the code was doing (optional). `extra` is for any other context you logged. Note that, when building your `extra`, that :opts, :job, and :code are used by this method, and :current_db and :current_hostname are used by handle_exception.
206
207
208
209
210
211
212
213
|
# File 'app/jobs/base.rb', line 206
def error_context(opts, code_desc = nil, = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
ctx[:message] = code_desc if code_desc
ctx.merge!() if != nil
ctx
end
|
#execute(opts = {}) ⇒ Object
219
220
221
|
# File 'app/jobs/base.rb', line 219
def execute(opts = {})
raise "Overwrite me!"
end
|
#last_db_duration ⇒ Object
223
224
225
|
# File 'app/jobs/base.rb', line 223
def last_db_duration
@db_duration || 0
end
|
#log(*args) ⇒ Object
191
192
193
194
195
196
|
# File 'app/jobs/base.rb', line 191
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
|
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
|
# File 'app/jobs/base.rb', line 258
def perform(*args)
requeued = false
keepalive_thread = nil
finished = false
if self.class.get_cluster_concurrency
if !self.class.acquire_cluster_concurrency_lock!
self.class.perform_in(10.seconds, *args)
requeued = true
return
end
parent_thread = Thread.current
cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key
keepalive_thread =
Thread.new do
while parent_thread.alive? && !finished
Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
60.times do
break if finished
sleep 1
end
end
end
end
opts = args..with_indifferent_access
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
dbs =
if opts[:current_site_id]
[opts[:current_site_id]]
else
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
dbs.each do |db|
begin
exception = {}
RailsMultisite::ConnectionManagement.with_connection(db) do
job_instrumenter =
JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
begin
I18n.locale =
SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
I18n.ensure_all_loaded!
begin
logster_env = {}
Logster.add_to_env(logster_env, :job, self.class.to_s)
Logster.add_to_env(logster_env, :db, db)
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
job_instrumenter.stop(exception: exception)
end
end
exceptions << exception unless exception.empty?
end
end
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(
exception_hash[:ex],
error_context(opts, exception_hash[:code], exception_hash[:other]),
)
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
if self.class.get_cluster_concurrency && !requeued
finished = true
keepalive_thread.wakeup
keepalive_thread.join
self.class.clear_cluster_concurrency_lock!
end
ActiveRecord::Base.connection_handler.clear_active_connections!
end
|
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
# File 'app/jobs/base.rb', line 227
def perform_immediately(*args)
opts = args..with_indifferent_access
if opts.has_key?(:current_site_id) &&
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new(
"You can't connect to another database when executing a job synchronously.",
)
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
end
retval
end
end
|