Class: Plines::JobBatch
Overview
Represents a group of jobs that are enqueued together as a batch, based on the step dependency graph.
Defined Under Namespace
Classes: SpawnOptions
Constant Summary
collapse
- JobNotPendingError =
Class.new(ArgumentError)
- BATCH_DATA_KEY =
"batch_data"
- EXT_DEP_KEYS_KEY =
"ext_dep_keys"
- CannotFindExistingJobBatchError =
Class.new(StandardError)
- JobBatchAlreadyCreatedError =
Class.new(StandardError)
- AddingExternalDependencyNotAllowedError =
Class.new(StandardError)
- CannotCancelError =
Class.new(StandardError)
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#declared_redis_object_keys, included, #key_prefix, #new_redis_object
Constructor Details
#initialize(qless, pipeline, id) {|_self| ... } ⇒ JobBatch
Returns a new instance of JobBatch.
21
22
23
24
25
26
27
|
# File 'lib/plines/job_batch.rb', line 21
def initialize(qless, pipeline, id)
@qless = qless
@redis = qless.redis
@allowed_to_add_external_deps = false
super(pipeline, id)
yield self if block_given?
end
|
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id
9
10
11
|
# File 'lib/plines/job_batch.rb', line 9
def id
@id
end
|
#pipeline ⇒ Object
Returns the value of attribute pipeline
9
10
11
|
# File 'lib/plines/job_batch.rb', line 9
def pipeline
@pipeline
end
|
#qless ⇒ Object
Returns the value of attribute qless.
19
20
21
|
# File 'lib/plines/job_batch.rb', line 19
def qless
@qless
end
|
#redis ⇒ Object
Returns the value of attribute redis.
19
20
21
|
# File 'lib/plines/job_batch.rb', line 19
def redis
@redis
end
|
Class Method Details
.create(qless, pipeline, id, batch_data, options = {}) ⇒ Object
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/plines/job_batch.rb', line 52
def self.create(qless, pipeline, id, batch_data, options = {})
new(qless, pipeline, id) do |inst|
if inst.created_at
raise JobBatchAlreadyCreatedError,
"Job batch #{pipeline} / #{id} already exists"
end
inst.send(:populate_meta_for_create, batch_data, options)
inst.populate_external_deps_meta { yield inst if block_given? }
inst.meta.delete(:creation_in_progress)
end
end
|
.find(qless, pipeline, id) ⇒ Object
38
39
40
41
42
43
44
45
46
47
|
# File 'lib/plines/job_batch.rb', line 38
def self.find(qless, pipeline, id)
new(qless, pipeline, id) do |inst|
unless inst.created_at
raise CannotFindExistingJobBatchError,
"Cannot find an existing job batch for #{pipeline} / #{id}"
end
yield inst if block_given?
end
end
|
Instance Method Details
#add_job(jid, *external_dependencies) ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/plines/job_batch.rb', line 87
def add_job(jid, *external_dependencies)
pending_job_jids << jid
unless @allowed_to_add_external_deps || external_dependencies.none?
raise AddingExternalDependencyNotAllowedError, "You cannot add jobs " +
"with external dependencies after creating the job batch."
else
external_dependencies.each do |dep|
newly_added_external_deps << dep
external_dependency_sets[dep] << jid
end
EnqueuedJob.create(qless, pipeline, jid, *external_dependencies)
end
end
|
#cancel ⇒ Object
218
219
220
221
|
# File 'lib/plines/job_batch.rb', line 218
def cancel
return false if complete?
perform_cancellation
end
|
#cancel! ⇒ Object
209
210
211
212
213
214
215
216
|
# File 'lib/plines/job_batch.rb', line 209
def cancel!
if complete?
raise CannotCancelError,
"JobBatch #{id} is already complete and cannot be cancelled"
end
perform_cancellation
end
|
#cancelled? ⇒ Boolean
185
186
187
|
# File 'lib/plines/job_batch.rb', line 185
def cancelled?
meta["cancelled"] == "1"
end
|
#complete? ⇒ Boolean
146
147
148
|
# File 'lib/plines/job_batch.rb', line 146
def complete?
_complete?(pending_job_jids.length, completed_job_jids.length)
end
|
#completed_at ⇒ Object
181
182
183
|
# File 'lib/plines/job_batch.rb', line 181
def completed_at
time_from "completed_at"
end
|
#created_at ⇒ Object
177
178
179
|
# File 'lib/plines/job_batch.rb', line 177
def created_at
time_from "created_at"
end
|
#creation_in_progress? ⇒ Boolean
189
190
191
|
# File 'lib/plines/job_batch.rb', line 189
def creation_in_progress?
meta["creation_in_progress"] == "1"
end
|
#external_deps ⇒ Object
79
80
81
82
83
84
85
|
# File 'lib/plines/job_batch.rb', line 79
def external_deps
if keys = meta[EXT_DEP_KEYS_KEY]
decode(keys)
else
[]
end
end
|
#has_unresolved_external_dependency?(dep_name) ⇒ Boolean
166
167
168
169
170
171
|
# File 'lib/plines/job_batch.rb', line 166
def has_unresolved_external_dependency?(dep_name)
external_dependency_sets[dep_name].any? do |jid|
EnqueuedJob.new(qless, pipeline, jid)
.unresolved_external_dependencies.include?(dep_name)
end
end
|
#job_jids ⇒ Object
103
104
105
|
# File 'lib/plines/job_batch.rb', line 103
def job_jids
pending_job_jids | completed_job_jids
end
|
#job_repository ⇒ Object
111
112
113
|
# File 'lib/plines/job_batch.rb', line 111
def job_repository
qless.jobs
end
|
#jobs ⇒ Object
107
108
109
|
# File 'lib/plines/job_batch.rb', line 107
def jobs
job_jids.map { |jid| EnqueuedJob.new(qless, pipeline, jid) }
end
|
#mark_job_as_complete(jid) ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/plines/job_batch.rb', line 127
def mark_job_as_complete(jid)
moved, pending_count, complete_count = redis.multi do
pending_job_jids.move(jid, completed_job_jids)
pending_job_jids.length
completed_job_jids.length
end
unless moved
raise JobNotPendingError,
"Jid #{jid} cannot be marked as complete for " +
"job batch #{id} since it is not pending"
end
if _complete?(pending_count, complete_count)
meta["completed_at"] = Time.now.iso8601
set_expiration!
end
end
|
#newly_added_external_deps ⇒ Object
75
76
77
|
# File 'lib/plines/job_batch.rb', line 75
def newly_added_external_deps
@newly_added_external_deps ||= []
end
|
#pending_qless_jobs ⇒ Object
115
116
117
118
119
|
# File 'lib/plines/job_batch.rb', line 115
def pending_qless_jobs
pending_job_jids.map do |jid|
job_repository[jid]
end.compact
end
|
66
67
68
69
70
71
72
73
|
# File 'lib/plines/job_batch.rb', line 66
def populate_external_deps_meta
@allowed_to_add_external_deps = true
yield
ext_deps = external_deps | newly_added_external_deps.to_a
meta[EXT_DEP_KEYS_KEY] = JSON.dump(ext_deps)
ensure
@allowed_to_add_external_deps = false
end
|
#qless_jobs ⇒ Object
121
122
123
124
125
|
# File 'lib/plines/job_batch.rb', line 121
def qless_jobs
job_jids.map do |jid|
job_repository[jid]
end.compact
end
|
#resolve_external_dependency(dep_name) ⇒ Object
150
151
152
153
154
155
156
157
|
# File 'lib/plines/job_batch.rb', line 150
def resolve_external_dependency(dep_name)
jids = external_dependency_sets[dep_name]
update_external_dependency \
dep_name, :resolve_external_dependency, jids
cancel_timeout_job_jid_set_for(dep_name)
end
|
#spawn_copy {|options| ... } ⇒ Object
241
242
243
244
245
246
247
248
249
250
|
# File 'lib/plines/job_batch.rb', line 241
def spawn_copy
options = SpawnOptions.new({})
yield options if block_given?
overrides = JSON.parse(JSON.dump options.data_overrides)
pipeline.enqueue_jobs_for(data.merge(overrides), {
spawned_from_id: id,
timeout_reduction: options.timeout_reduction || 0
})
end
|
#spawned_from ⇒ Object
197
198
199
200
201
202
203
204
205
|
# File 'lib/plines/job_batch.rb', line 197
def spawned_from
return @spawned_from if defined?(@spawned_from)
if id = meta["spawned_from_id"]
@spawned_from = self.class.find(qless, pipeline, id)
else
@spawned_from = nil
end
end
|
#timed_out_external_dependencies ⇒ Object
173
174
175
|
# File 'lib/plines/job_batch.rb', line 173
def timed_out_external_dependencies
timed_out_external_deps.to_a
end
|
#timeout_external_dependency(dep_name, jids) ⇒ Object
159
160
161
162
163
164
|
# File 'lib/plines/job_batch.rb', line 159
def timeout_external_dependency(dep_name, jids)
update_external_dependency \
dep_name, :timeout_external_dependency, Array(jids)
timed_out_external_deps << dep_name
end
|
#timeout_job_jid_sets ⇒ Object
232
233
234
235
236
237
|
# File 'lib/plines/job_batch.rb', line 232
def timeout_job_jid_sets
@timeout_job_jid_sets ||= Hash.new do |hash, dep|
key = [key_prefix, "timeout_job_jids", dep].join(':')
hash[dep] = Redis::Set.new(key, redis)
end
end
|
#timeout_reduction ⇒ Object
193
194
195
|
# File 'lib/plines/job_batch.rb', line 193
def timeout_reduction
@timeout_reduction ||= meta["timeout_reduction"].to_i
end
|
#track_timeout_job(dep_name, jid) ⇒ Object
228
229
230
|
# File 'lib/plines/job_batch.rb', line 228
def track_timeout_job(dep_name, jid)
timeout_job_jid_sets[dep_name] << jid
end
|