Class: Sbmt::Outbox::BaseDeleteStaleItemsJob

Inherits:
Object
  • Object
show all
Defined in:
app/jobs/sbmt/outbox/base_delete_stale_items_job.rb

Constant Summary collapse

LOCK_TTL =
10_800_000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#item_classObject

Returns the value of attribute item_class.



26
27
28
# File 'app/jobs/sbmt/outbox/base_delete_stale_items_job.rb', line 26

def item_class
  @item_class
end

#lock_timerObject

Returns the value of attribute lock_timer.



26
27
28
# File 'app/jobs/sbmt/outbox/base_delete_stale_items_job.rb', line 26

def lock_timer
  @lock_timer
end

Class Method Details

.enqueueObject



11
12
13
14
15
16
# File 'app/jobs/sbmt/outbox/base_delete_stale_items_job.rb', line 11

def enqueue
  item_classes.each do |item_class|
    delay = rand(15).seconds
    set(wait: delay).perform_later(item_class.to_s)
  end
end

.item_classesObject

Raises:

  • (NotImplementedError)


18
19
20
# File 'app/jobs/sbmt/outbox/base_delete_stale_items_job.rb', line 18

def item_classes
  raise NotImplementedError
end

Instance Method Details

#perform(item_class_name) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'app/jobs/sbmt/outbox/base_delete_stale_items_job.rb', line 28

def perform(item_class_name)
  self.item_class = item_class_name.constantize

  client = if Gem::Version.new(Redlock::VERSION) >= Gem::Version.new("2.0.0")
    Sbmt::Outbox.redis
  else
    Redis.new(config.redis)
  end

  self.lock_timer = Cutoff.new(LOCK_TTL / 1000)
  lock_manager = Redlock::Client.new([client], retry_count: 0)

  lock_manager.lock("#{self.class.name}:#{item_class_name}:lock", LOCK_TTL) do |locked|
    if locked
      duration_failed = item_class.config.retention
      duration_delivered = item_class.config.retention_delivered_items

      validate_retention!(duration_delivered, duration_failed)

      logger.with_tags(box_type: box_type, box_name: box_name) do
        delete_stale_items(Time.current - duration_failed, Time.current - duration_delivered)
      end
    else
      logger.log_info("Failed to acquire lock #{self.class.name}:#{item_class_name}")
    end
  end
rescue Cutoff::CutoffExceededError
  logger.log_info("Lock timeout while processing #{item_class_name}")
end