Class: Ci::PipelineProcessing::AtomicProcessingService

Inherits:
Object
  • Object
show all
Includes:
ExclusiveLeaseGuard, Gitlab::Utils::StrongMemoize
Defined in:
app/services/ci/pipeline_processing/atomic_processing_service.rb,
app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb

Defined Under Namespace

Classes: StatusCollection

Constant Summary collapse

DEFAULT_LEASE_TIMEOUT =
1.minute
BATCH_SIZE =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ExclusiveLeaseGuard

#exclusive_lease, #lease_release?, #lease_taken_message, #log_lease_taken, #release_lease, #renew_lease!, #try_obtain_lease

Constructor Details

#initialize(pipeline) ⇒ AtomicProcessingService

Returns a new instance of AtomicProcessingService.



14
15
16
17
# File 'app/services/ci/pipeline_processing/atomic_processing_service.rb', line 14

def initialize(pipeline)
  @pipeline = pipeline
  @collection = AtomicProcessingService::StatusCollection.new(pipeline)
end

Instance Attribute Details

#pipelineObject (readonly)

Returns the value of attribute pipeline.



9
10
11
# File 'app/services/ci/pipeline_processing/atomic_processing_service.rb', line 9

def pipeline
  @pipeline
end

Instance Method Details

#executeObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'app/services/ci/pipeline_processing/atomic_processing_service.rb', line 19

def execute
  return unless pipeline.needs_processing?

  # Run the process only if we can obtain an exclusive lease; returns nil if lease is unavailable
  success = try_obtain_lease { process! }

  if success
    # If any jobs changed from stopped to alive status during pipeline processing, we must
    # re-reset their dependent jobs; see https://gitlab.com/gitlab-org/gitlab/-/issues/388539.
    new_alive_jobs.group_by(&:user).each do |user, jobs|
      log_running_reset_skipped_jobs_service(jobs)

      ResetSkippedJobsService.new(project, user).execute(jobs)
    end

    # Re-schedule if we need further processing
    PipelineProcessWorker.perform_async(pipeline.id) if pipeline.needs_processing?
  end

  success
end