Class: Sidekiq::Scheduled::Enq
- Inherits:
-
Object
- Object
- Sidekiq::Scheduled::Enq
- Defined in:
- lib/sidekiq/scheduled.rb
Instance Method Summary collapse
Instance Method Details
#enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/sidekiq/scheduled.rb', line 12 def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. Sidekiq.redis do |conn| sorted_sets.each do |sorted_set| # Get next items in the queue with scores (time to execute) <= now. until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty? # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. jobs.each do |job| # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, job) Sidekiq::Client.push(Sidekiq.load_json(job)) Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end end end |