Class: Jiggler::Scheduled::Enqueuer
Constant Summary
collapse
- LUA_ZPOPBYSCORE =
<<~LUA
local key, now = KEYS[1], ARGV[1]
local jobs = redis.call('zrangebyscore', key, '-inf', now, 'limit', 0, 1)
if jobs[1] then
redis.call('zrem', key, jobs[1])
return jobs[1]
end
LUA
Instance Method Summary
collapse
#log_error, #log_error_short, #logger, #safe_async, #scan_all, #tid
Constructor Details
#initialize(config) ⇒ Enqueuer
Returns a new instance of Enqueuer.
17
18
19
20
21
22
|
# File 'lib/jiggler/scheduled/enqueuer.rb', line 17
def initialize(config)
@config = config
@done = false
@lua_zpopbyscore_sha = nil
@tid = tid
end
|
Instance Method Details
#enqueue_jobs ⇒ Object
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/jiggler/scheduled/enqueuer.rb', line 24
def enqueue_jobs
@config.with_async_redis do |conn|
sorted_sets.each do |sorted_set|
job_args = zpopbyscore(conn, key: sorted_set, argv: Time.now.to_f.to_s)
while !@done && job_args
push_job(conn, job_args)
job_args = zpopbyscore(conn, key: sorted_set, argv: Time.now.to_f.to_s)
end
end
rescue => err
log_error_short(err, context: '\'Enqueuing jobs error\'', tid: @tid)
end
end
|
#push_job(conn, job_args) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/jiggler/scheduled/enqueuer.rb', line 43
def push_job(conn, job_args)
name = Oj.load(job_args, mode: :compat)['queue'] || @config.default_queue
list_name = "#{@config.queue_prefix}#{name}"
conn.call('LPUSH', list_name, job_args)
rescue => err
log_error_short(
err, {
context: '\'Pushing scheduled job error\'',
tid: @tid,
job_args: job_args,
queue: list_name
}
)
end
|
#terminate ⇒ Object
39
40
41
|
# File 'lib/jiggler/scheduled/enqueuer.rb', line 39
def terminate
@done = true
end
|