Class: SidekiqRobustJob::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq_robust_job/repository.rb

Instance Method Summary collapse

Constructor Details

#initialize(jobs_database:, clock:) ⇒ Repository

Returns a new instance of Repository.



6
7
8
9
# File 'lib/sidekiq_robust_job/repository.rb', line 6

def initialize(jobs_database:, clock:)
  @jobs_database = jobs_database
  @clock = clock
end

Instance Method Details

#build(attributes) ⇒ Object



27
28
29
# File 'lib/sidekiq_robust_job/repository.rb', line 27

def build(attributes)
  jobs_database.new(attributes)
end

#create(attributes) ⇒ Object



23
24
25
# File 'lib/sidekiq_robust_job/repository.rb', line 23

def create(attributes)
  jobs_database.create!(attributes)
end

#drop_not_started_jobs_by_digest(dropped_by_job_id:, digest:, exclude_id:) ⇒ Object



62
63
64
65
66
67
68
69
# File 'lib/sidekiq_robust_job/repository.rb', line 62

def drop_not_started_jobs_by_digest(dropped_by_job_id:, digest:, exclude_id:)
  transaction do
    not_started_for_digest(digest, exclude_id: exclude_id).lock!.find_each do |job|
      job.drop(dropped_by_job_id: dropped_by_job_id)
      save(job)
    end
  end
end

#drop_unprocessed_jobs_by_digest(dropped_by_job_id:, digest:, exclude_id:) ⇒ Object



53
54
55
56
57
58
59
60
# File 'lib/sidekiq_robust_job/repository.rb', line 53

def drop_unprocessed_jobs_by_digest(dropped_by_job_id:, digest:, exclude_id:)
  transaction do
    unprocessed_for_digest(digest, exclude_id: exclude_id).lock!.find_each do |job|
      job.drop(dropped_by_job_id: dropped_by_job_id)
      save(job)
    end
  end
end

#find(id) ⇒ Object



15
16
17
# File 'lib/sidekiq_robust_job/repository.rb', line 15

def find(id)
  jobs_database.find(id)
end

#missed_jobs(missed_job_policy:) ⇒ Object



31
32
33
34
35
# File 'lib/sidekiq_robust_job/repository.rb', line 31

def missed_jobs(missed_job_policy:)
  jobs_database
    .where(completed_at: nil, dropped_at: nil, failed_at: nil)
    .select { |potentially_missed_job| missed_job_policy.call(potentially_missed_job) }
end

#not_started_for_digest(digest, exclude_id:) ⇒ Object



37
38
39
40
41
42
43
# File 'lib/sidekiq_robust_job/repository.rb', line 37

def not_started_for_digest(digest, exclude_id:)
  jobs_database
    .where(digest: digest)
    .where(started_at: nil)
    .where(dropped_at: nil)
    .where.not(id: exclude_id)
end

#save(record) ⇒ Object



19
20
21
# File 'lib/sidekiq_robust_job/repository.rb', line 19

def save(record)
  record.save! if record.changed?
end

#transactionObject



11
12
13
# File 'lib/sidekiq_robust_job/repository.rb', line 11

def transaction
  jobs_database.transaction { yield }
end

#unprocessed_for_digest(digest, exclude_id:) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/sidekiq_robust_job/repository.rb', line 45

def unprocessed_for_digest(digest, exclude_id:)
  jobs_database
    .where(digest: digest)
    .where(completed_at: nil)
    .where(dropped_at: nil)
    .where.not(id: exclude_id)
end