Class: Plines::JobBatch

Inherits:
Struct
  • Object
show all
Includes:
RedisObjectsHelpers
Defined in:
lib/plines/job_batch.rb

Overview

Represents a group of jobs that are enqueued together as a batch, based on the step dependency graph.

Defined Under Namespace

Classes: SpawnOptions

Constant Summary collapse

JobNotPendingError =
Class.new(ArgumentError)
BATCH_DATA_KEY =
"batch_data"
EXT_DEP_KEYS_KEY =
"ext_dep_keys"
CannotFindExistingJobBatchError =
Class.new(StandardError)
JobBatchAlreadyCreatedError =
Class.new(StandardError)
AddingExternalDependencyNotAllowedError =
Class.new(StandardError)
CannotCancelError =
Class.new(StandardError)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RedisObjectsHelpers

#declared_redis_object_keys, included, #key_prefix, #new_redis_object

Constructor Details

#initialize(qless, pipeline, id) {|_self| ... } ⇒ JobBatch

Returns a new instance of JobBatch.

Yields:

  • (_self)

Yield Parameters:



21
22
23
24
25
26
27
# File 'lib/plines/job_batch.rb', line 21

def initialize(qless, pipeline, id)
  @qless = qless
  @redis = qless.redis
  @allowed_to_add_external_deps = false
  super(pipeline, id)
  yield self if block_given?
end

Instance Attribute Details

#idObject

Returns the value of attribute id

Returns:

  • (Object)

    the current value of id



9
10
11
# File 'lib/plines/job_batch.rb', line 9

def id
  @id
end

#pipelineObject

Returns the value of attribute pipeline

Returns:

  • (Object)

    the current value of pipeline



9
10
11
# File 'lib/plines/job_batch.rb', line 9

def pipeline
  @pipeline
end

#qlessObject (readonly)

Returns the value of attribute qless.



19
20
21
# File 'lib/plines/job_batch.rb', line 19

def qless
  @qless
end

#redisObject (readonly)

Returns the value of attribute redis.



19
20
21
# File 'lib/plines/job_batch.rb', line 19

def redis
  @redis
end

Class Method Details

.create(qless, pipeline, id, batch_data, options = {}) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/plines/job_batch.rb', line 52

def self.create(qless, pipeline, id, batch_data, options = {})
  new(qless, pipeline, id) do |inst|
    if inst.created_at
      raise JobBatchAlreadyCreatedError,
        "Job batch #{pipeline} / #{id} already exists"
    end

    inst.send(:populate_meta_for_create, batch_data, options)

    inst.populate_external_deps_meta { yield inst if block_given? }
    inst.meta.delete(:creation_in_progress)
  end
end

.find(qless, pipeline, id) ⇒ Object



38
39
40
41
42
43
44
45
46
47
# File 'lib/plines/job_batch.rb', line 38

def self.find(qless, pipeline, id)
  new(qless, pipeline, id) do |inst|
    unless inst.created_at
      raise CannotFindExistingJobBatchError,
        "Cannot find an existing job batch for #{pipeline} / #{id}"
    end

    yield inst if block_given?
  end
end

Instance Method Details

#add_job(jid, *external_dependencies) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/plines/job_batch.rb', line 87

def add_job(jid, *external_dependencies)
  pending_job_jids << jid

  unless @allowed_to_add_external_deps || external_dependencies.none?
    raise AddingExternalDependencyNotAllowedError, "You cannot add jobs " +
      "with external dependencies after creating the job batch."
  else
    external_dependencies.each do |dep|
      newly_added_external_deps << dep
      external_dependency_sets[dep] << jid
    end

    EnqueuedJob.create(qless, pipeline, jid, *external_dependencies)
  end
end

#cancelObject



218
219
220
221
# File 'lib/plines/job_batch.rb', line 218

def cancel
  return false if complete?
  perform_cancellation
end

#cancel!Object



209
210
211
212
213
214
215
216
# File 'lib/plines/job_batch.rb', line 209

def cancel!
  if complete?
    raise CannotCancelError,
      "JobBatch #{id} is already complete and cannot be cancelled"
  end

  perform_cancellation
end

#cancelled?Boolean

Returns:

  • (Boolean)


185
186
187
# File 'lib/plines/job_batch.rb', line 185

def cancelled?
  meta["cancelled"] == "1"
end

#complete?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/plines/job_batch.rb', line 146

def complete?
  _complete?(pending_job_jids.length, completed_job_jids.length)
end

#completed_atObject



181
182
183
# File 'lib/plines/job_batch.rb', line 181

def completed_at
  time_from "completed_at"
end

#created_atObject



177
178
179
# File 'lib/plines/job_batch.rb', line 177

def created_at
  time_from "created_at"
end

#creation_in_progress?Boolean

Returns:

  • (Boolean)


189
190
191
# File 'lib/plines/job_batch.rb', line 189

def creation_in_progress?
  meta["creation_in_progress"] == "1"
end

#dataObject



223
224
225
226
# File 'lib/plines/job_batch.rb', line 223

def data
  data = decode(meta[BATCH_DATA_KEY])
  data && IndifferentHash.from(data)
end

#external_depsObject



79
80
81
82
83
84
85
# File 'lib/plines/job_batch.rb', line 79

def external_deps
  if keys = meta[EXT_DEP_KEYS_KEY]
    decode(keys)
  else
    []
  end
end

#has_unresolved_external_dependency?(dep_name) ⇒ Boolean

Returns:

  • (Boolean)


166
167
168
169
170
171
# File 'lib/plines/job_batch.rb', line 166

def has_unresolved_external_dependency?(dep_name)
  external_dependency_sets[dep_name].any? do |jid|
    EnqueuedJob.new(qless, pipeline, jid)
               .unresolved_external_dependencies.include?(dep_name)
  end
end

#job_jidsObject



103
104
105
# File 'lib/plines/job_batch.rb', line 103

def job_jids
  pending_job_jids | completed_job_jids
end

#job_repositoryObject



111
112
113
# File 'lib/plines/job_batch.rb', line 111

def job_repository
  qless.jobs
end

#jobsObject



107
108
109
# File 'lib/plines/job_batch.rb', line 107

def jobs
  job_jids.map { |jid| EnqueuedJob.new(qless, pipeline, jid) }
end

#mark_job_as_complete(jid) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/plines/job_batch.rb', line 127

def mark_job_as_complete(jid)
  moved, pending_count, complete_count = redis.multi do
    pending_job_jids.move(jid, completed_job_jids)
    pending_job_jids.length
    completed_job_jids.length
  end

  unless moved
    raise JobNotPendingError,
      "Jid #{jid} cannot be marked as complete for " +
      "job batch #{id} since it is not pending"
  end

  if _complete?(pending_count, complete_count)
    meta["completed_at"] = Time.now.iso8601
    set_expiration!
  end
end

#newly_added_external_depsObject



75
76
77
# File 'lib/plines/job_batch.rb', line 75

def newly_added_external_deps
  @newly_added_external_deps ||= []
end

#pending_qless_jobsObject



115
116
117
118
119
# File 'lib/plines/job_batch.rb', line 115

def pending_qless_jobs
  pending_job_jids.map do |jid|
    job_repository[jid]
  end.compact
end

#populate_external_deps_metaObject



66
67
68
69
70
71
72
73
# File 'lib/plines/job_batch.rb', line 66

def populate_external_deps_meta
  @allowed_to_add_external_deps = true
  yield
  ext_deps = external_deps | newly_added_external_deps.to_a
  meta[EXT_DEP_KEYS_KEY] = JSON.dump(ext_deps)
ensure
  @allowed_to_add_external_deps = false
end

#qless_jobsObject



121
122
123
124
125
# File 'lib/plines/job_batch.rb', line 121

def qless_jobs
  job_jids.map do |jid|
    job_repository[jid]
  end.compact
end

#resolve_external_dependency(dep_name) ⇒ Object



150
151
152
153
154
155
156
157
# File 'lib/plines/job_batch.rb', line 150

def resolve_external_dependency(dep_name)
  jids = external_dependency_sets[dep_name]

  update_external_dependency \
    dep_name, :resolve_external_dependency, jids

  cancel_timeout_job_jid_set_for(dep_name)
end

#spawn_copy {|options| ... } ⇒ Object

Yields:

  • (options)


241
242
243
244
245
246
247
248
249
250
# File 'lib/plines/job_batch.rb', line 241

def spawn_copy
  options = SpawnOptions.new({})
  yield options if block_given?
  overrides = JSON.parse(JSON.dump options.data_overrides)

  pipeline.enqueue_jobs_for(data.merge(overrides), {
    spawned_from_id: id,
    timeout_reduction: options.timeout_reduction || 0
  })
end

#spawned_fromObject



197
198
199
200
201
202
203
204
205
# File 'lib/plines/job_batch.rb', line 197

def spawned_from
  return @spawned_from if defined?(@spawned_from)

  if id = meta["spawned_from_id"]
    @spawned_from = self.class.find(qless, pipeline, id)
  else
    @spawned_from = nil
  end
end

#timed_out_external_dependenciesObject



173
174
175
# File 'lib/plines/job_batch.rb', line 173

def timed_out_external_dependencies
  timed_out_external_deps.to_a
end

#timeout_external_dependency(dep_name, jids) ⇒ Object



159
160
161
162
163
164
# File 'lib/plines/job_batch.rb', line 159

def timeout_external_dependency(dep_name, jids)
  update_external_dependency \
    dep_name, :timeout_external_dependency, Array(jids)

  timed_out_external_deps << dep_name
end

#timeout_job_jid_setsObject



232
233
234
235
236
237
# File 'lib/plines/job_batch.rb', line 232

def timeout_job_jid_sets
  @timeout_job_jid_sets ||= Hash.new do |hash, dep|
    key = [key_prefix, "timeout_job_jids", dep].join(':')
    hash[dep] = Redis::Set.new(key, redis)
  end
end

#timeout_reductionObject



193
194
195
# File 'lib/plines/job_batch.rb', line 193

def timeout_reduction
  @timeout_reduction ||= meta["timeout_reduction"].to_i
end

#track_timeout_job(dep_name, jid) ⇒ Object



228
229
230
# File 'lib/plines/job_batch.rb', line 228

def track_timeout_job(dep_name, jid)
  timeout_job_jid_sets[dep_name] << jid
end