5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/canvas_sync/job_batches/jobs/managed_batch_job.rb', line 5
def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, preflight_check: nil, desc_prefix: nil, &blk)
desc_prefix ||= ''
if concurrency == 0 || concurrency == nil || concurrency == true
concurrency = sub_jobs.count
elsif concurrency == false
concurrency = 1
end
root_batch = Batch.new
man_batch_id = nil
if concurrency < sub_jobs.count
man_batch_id = SecureRandom.urlsafe_base64(10)
Batch.redis do |r|
r.multi do |r|
r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
r.hset("MNGBID-#{man_batch_id}", "ordered", ordered ? 1 : 0)
r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
r.hset("MNGBID-#{man_batch_id}", "preflight_check", preflight_check) if preflight_check.present?
r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)
mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
j['_mngbid_index_'] = i j = ::ActiveJob::Arguments.serialize([j])
JSON.unparse(j)
end
if ordered
r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
else
r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
end
r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
end
end
root_batch.allow_context_changes = (concurrency == 1)
root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id)
desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}"
end
root_batch.context = context
blk.call(ManagedBatchProxy.new(root_batch)) if blk.present?
root_batch.description = "#{desc_prefix}#{root_batch.description || 'Root'}"
root_batch.context["managed_batch_bid"] = man_batch_id if man_batch_id
if concurrency < sub_jobs.count
root_batch.placeholder!
concurrency.times do
perform_next_sequence_job(man_batch_id, skip_preflight: true)
end
else
root_batch.jobs do
sub_jobs.each do |j|
ChainBuilder.enqueue_job(j)
end
end
end
root_batch
end
|