Class: Jobs::Base
- Inherits:
-
Object
show all
- Includes:
- Sidekiq::Worker
- Defined in:
- app/jobs/base.rb
Direct Known Subclasses
AdminConfirmationEmail, AnonymizeUser, AutomaticGroupMembership, BackfillSidebarSiteSettings, BackupChunksMerger, BulkGrantTrustLevel, BulkInvite, BulkUserTitleUpdate, ChangeDisplayName, ConfirmSnsSubscription, CrawlTopicLink, CreateAvatarThumbnails, CreateBackup, CreateLinkedTopic, CreateUserReviewable, DeleteInaccessibleNotifications, DownloadAvatarFromUrl, DownloadBackupEmail, DownloadProfileBackgroundFromUrl, EmitWebHookEvent, EnableBootstrapMode, ExportCsvFile, ExportUserArchive, FeatureTopicUsers, GenerateTopicThumbnails, GroupPmAlert, GroupPmUpdateSummary, GroupSmtpEmail, IndexCategoryForSearch, InviteEmail, InvitePasswordInstructionsEmail, MakeEmbeddedTopicVisible, MassAwardBadge, MergeUser, NotifyCategoryChange, NotifyMailingListSubscribers, NotifyMovedPosts, NotifyPostRevision, NotifyReviewable, NotifyTagChange, Onceoff, PostAlert, PostUpdateTopicTrackingState, ProcessBulkInviteEmails, ProcessEmail, ProcessPost, ProcessSnsNotification, PublishGroupMembershipUpdates, PullHotlinkedImages, PushNotification, RebakeCustomEmojiPosts, RebakePostsForUpload, RefreshUsersReviewableCounts, RemoveBanner, RetrieveTopic, RunHeartbeat, Scheduled, SendPushNotification, SendSystemMessage, StreamTopicSummary, SuspiciousLogin, SyncAclsForUploads, SyncTopicUserBookmarked, ToggleTopicClosed, TopicActionConverter, TopicTimerBase, TruncateUserFlagStats, UnpinTopic, UpdateGravatar, UpdateGroupMentions, UpdateHotlinkedRaw, UpdatePostUploadsSecureStatus, UpdateS3Inventory, UpdateTopRedirection, UpdateTopicUploadSecurity, UpdateUsername, UserEmail
Defined Under Namespace
Classes: JobInstrumenter
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.acquire_cluster_concurrency_lock! ⇒ Object
236
237
238
|
# File 'app/jobs/base.rb', line 236
def self.acquire_cluster_concurrency_lock!
!!Discourse.redis.without_namespace.set(cluster_concurrency_redis_key, 0, nx: true, ex: 120)
end
|
.clear_cluster_concurrency_lock! ⇒ Object
232
233
234
|
# File 'app/jobs/base.rb', line 232
def self.clear_cluster_concurrency_lock!
Discourse.redis.without_namespace.del(cluster_concurrency_redis_key)
end
|
.cluster_concurrency(val) ⇒ Object
164
165
166
167
|
# File 'app/jobs/base.rb', line 164
def self.cluster_concurrency(val)
raise ArgumentError, "cluster_concurrency must be 1 or nil" if val != 1 && val != nil
@cluster_concurrency = val
end
|
.cluster_concurrency_redis_key ⇒ Object
228
229
230
|
# File 'app/jobs/base.rb', line 228
def self.cluster_concurrency_redis_key
"cluster_concurrency:#{self}"
end
|
197
198
199
|
# File 'app/jobs/base.rb', line 197
def self.delayed_perform(opts = {})
self.new.perform(opts)
end
|
.get_cluster_concurrency ⇒ Object
169
170
171
|
# File 'app/jobs/base.rb', line 169
def self.get_cluster_concurrency
@cluster_concurrency
end
|
Instance Method Details
#error_context(opts, code_desc = nil, extra = {}) ⇒ Object
Construct an error context object for Discourse.handle_exception Subclasses are encouraged to use this!
‘opts` is the arguments passed to execute(). `code_desc` is a short string describing what the code was doing (optional). `extra` is for any other context you logged. Note that, when building your `extra`, that :opts, :job, and :code are used by this method, and :current_db and :current_hostname are used by handle_exception.
188
189
190
191
192
193
194
195
|
# File 'app/jobs/base.rb', line 188
def error_context(opts, code_desc = nil, = {})
ctx = {}
ctx[:opts] = opts
ctx[:job] = self.class
ctx[:message] = code_desc if code_desc
ctx.merge!() if != nil
ctx
end
|
#execute(opts = {}) ⇒ Object
201
202
203
|
# File 'app/jobs/base.rb', line 201
def execute(opts = {})
raise "Overwrite me!"
end
|
#last_db_duration ⇒ Object
205
206
207
|
# File 'app/jobs/base.rb', line 205
def last_db_duration
@db_duration || 0
end
|
#log(*args) ⇒ Object
173
174
175
176
177
178
|
# File 'app/jobs/base.rb', line 173
def log(*args)
args.each do |arg|
Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
end
true
end
|
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
|
# File 'app/jobs/base.rb', line 240
def perform(*args)
requeued = false
keepalive_thread = nil
finished = false
if self.class.get_cluster_concurrency
if !self.class.acquire_cluster_concurrency_lock!
self.class.perform_in(10.seconds, *args)
requeued = true
return
end
parent_thread = Thread.current
cluster_concurrency_redis_key = self.class.cluster_concurrency_redis_key
keepalive_thread =
Thread.new do
while parent_thread.alive? && !finished
Discourse.redis.without_namespace.expire(cluster_concurrency_redis_key, 120)
sleep 60
end
end
end
opts = args..with_indifferent_access
Sidekiq.redis { |r| r.set("last_job_perform_at", Time.now.to_i) } if ::Jobs.run_later?
dbs =
if opts[:current_site_id]
[opts[:current_site_id]]
else
RailsMultisite::ConnectionManagement.all_dbs
end
exceptions = []
dbs.each do |db|
begin
exception = {}
RailsMultisite::ConnectionManagement.with_connection(db) do
job_instrumenter =
JobInstrumenter.new(job_class: self.class, opts: opts, db: db, jid: jid)
begin
I18n.locale =
SiteSetting.default_locale || SiteSettings::DefaultsProvider::DEFAULT_LOCALE
I18n.ensure_all_loaded!
begin
logster_env = {}
Logster.add_to_env(logster_env, :job, self.class.to_s)
Logster.add_to_env(logster_env, :db, db)
Thread.current[Logster::Logger::LOGSTER_ENV] = logster_env
execute(opts)
rescue => e
exception[:ex] = e
exception[:other] = { problem_db: db }
end
rescue => e
exception[:ex] = e
exception[:message] = "While establishing database connection to #{db}"
exception[:other] = { problem_db: db }
ensure
job_instrumenter.stop(exception: exception)
end
end
exceptions << exception unless exception.empty?
end
end
Thread.current[Logster::Logger::LOGSTER_ENV] = nil
if exceptions.length > 0
exceptions.each do |exception_hash|
Discourse.handle_job_exception(
exception_hash[:ex],
error_context(opts, exception_hash[:code], exception_hash[:other]),
)
end
raise HandledExceptionWrapper.new(exceptions[0][:ex])
end
nil
ensure
if self.class.get_cluster_concurrency && !requeued
finished = true
keepalive_thread.wakeup
keepalive_thread.join
self.class.clear_cluster_concurrency_lock!
end
ActiveRecord::Base.connection_handler.clear_active_connections!
end
|
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
|
# File 'app/jobs/base.rb', line 209
def perform_immediately(*args)
opts = args..with_indifferent_access
if opts.has_key?(:current_site_id) &&
opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
raise ArgumentError.new(
"You can't connect to another database when executing a job synchronously.",
)
else
begin
retval = execute(opts)
rescue => exc
Discourse.handle_job_exception(exc, error_context(opts))
end
retval
end
end
|