Class: Sidekiq::Scheduled::Enq
- Inherits:
-
Object
- Object
- Sidekiq::Scheduled::Enq
- Includes:
- Component
- Defined in:
- lib/sidekiq/scheduled.rb
Constant Summary collapse
- LUA_ZPOPBYSCORE =
<<~LUA local key, now = KEYS[1], ARGV[1] local jobs = redis.call("zrange", key, "-inf", now, "byscore", "limit", 0, 1) if jobs[1] then redis.call("zrem", key, jobs[1]) return jobs[1] end LUA
Instance Attribute Summary
Attributes included from Component
Instance Method Summary collapse
- #enqueue_jobs(sorted_sets = SETS) ⇒ Object
-
#initialize(container) ⇒ Enq
constructor
A new instance of Enq.
- #terminate ⇒ Object
Methods included from Component
#fire_event, #handle_exception, #hostname, #identity, #logger, #process_nonce, #redis, #safe_thread, #tid, #watchdog
Constructor Details
Instance Method Details
#enqueue_jobs(sorted_sets = SETS) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/sidekiq/scheduled.rb', line 29 def enqueue_jobs(sorted_sets = SETS) # A job's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of jobs with a timestamp before now. redis do |conn| sorted_sets.each do |sorted_set| # Get next item in the queue with score (time to execute) <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s])) @client.push(Sidekiq.load_json(job)) logger.debug { "enqueued #{sorted_set}: #{job}" } end end end end |
#terminate ⇒ Object
46 47 48 |
# File 'lib/sidekiq/scheduled.rb', line 46 def terminate @done = true end |