Class: SidekiqReliableRequeue::Worker

Inherits:
Object
  • Object
show all
Includes:
Sidekiq::Worker
Defined in:
lib/sidekiq-reliable-requeue/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.requeue_stale_jobsObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/sidekiq-reliable-requeue/worker.rb', line 22

def self.requeue_stale_jobs
  keys = RedisConnection.hkeys(SidekiqReliableStaleJobsKey)

  if keys.empty?
    Sidekiq.logger.info('No stale jobs to add')
  end

  keys.each do |key|
    data = RedisConnection.hget(SidekiqReliableStaleJobsKey, key)
    next unless data

    sidekiq_msg = JSON.parse(RedisConnection.hget(SidekiqReliableStaleJobsKey, key))

    Sidekiq.logger.info("Preparing job #{key} to be requeued in #{sidekiq_msg['requeue_timeout']}s")
    self.perform_in(sidekiq_msg['requeue_timeout'], key)
  end
end

Instance Method Details

#perform(key) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/sidekiq-reliable-requeue/worker.rb', line 5

def perform(key)
  data = RedisConnection.hget(SidekiqReliableStaleJobsKey, key)
  Sidekiq.logger.debug("Checking for data with key: #{key}")

  unless data
    Sidekiq.logger.debug("Job with key #{key} has been cleaned")
    return
  end

  sidekiq_msg = JSON.parse(RedisConnection.hget(SidekiqReliableStaleJobsKey, key))
  worker      = sidekiq_msg['class'].constantize

  Sidekiq.logger.info("Requeuing #{sidekiq_msg}")
  worker.client_push(sidekiq_msg.except('jid'))
  RedisConnection.hdel(SidekiqReliableStaleJobsKey, key)
end