Class: PipelineRunnerJob

Inherits:
ApplicationJob show all
Defined in:
app/jobs/pipeline_runner_job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#definitionObject

Returns the value of attribute definition.



2
3
4
# File 'app/jobs/pipeline_runner_job.rb', line 2

def definition
  @definition
end

#eventObject

Returns the value of attribute event.



2
3
4
# File 'app/jobs/pipeline_runner_job.rb', line 2

def event
  @event
end

#pipelineObject

Returns the value of attribute pipeline.



2
3
4
# File 'app/jobs/pipeline_runner_job.rb', line 2

def pipeline
  @pipeline
end

Instance Method Details

#complete_pipelineObject



75
76
77
78
79
80
81
# File 'app/jobs/pipeline_runner_job.rb', line 75

def complete_pipeline
  definition.class.ensure_stages.each do |ensured_stage|
    # will fail the job if an error is raised
    execute(ensured_stage)
  end
  update_status :complete
end

#execute(this_stage) ⇒ Object



64
65
66
67
68
69
70
71
72
73
# File 'app/jobs/pipeline_runner_job.rb', line 64

def execute(this_stage)
  begin
    definition.send(this_stage)
  rescue Exception => e
    pipeline.update!(status: 'ERROR', error: e.message)
    raise e
  ensure
    pipeline.save
  end
end

#last_stage_failed?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'app/jobs/pipeline_runner_job.rb', line 98

def last_stage_failed?
  Job.where('pipeline_id = ? AND pipeline_stage = ? AND (status = "FAILED" OR status = "CANCELLED")', pipeline.id, pipeline.stage).exists?
end

#last_stage_pending?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'app/jobs/pipeline_runner_job.rb', line 102

def last_stage_pending?
  Job.where('pipeline_id = ? AND pipeline_stage = ? AND (status = "QUEUED" OR status = "PENDING")', pipeline.id, pipeline.stage).exists?
end

#next_stageObject



83
84
85
86
87
88
89
90
91
92
# File 'app/jobs/pipeline_runner_job.rb', line 83

def next_stage
  @next_stage ||= begin
    if pipeline.stage
      cur_idx = definition.class.stages.find_index { |st| st == pipeline.stage.downcase.to_sym }
      definition.class.stages[cur_idx + 1]
    else
      definition.class.stages[0]
    end
  end
end

#perform(pipeline_object, event = nil, definition_name = nil) ⇒ Object

receives a pipeline as it’s argument



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'app/jobs/pipeline_runner_job.rb', line 7

def perform(pipeline_object, event=nil, definition_name=nil)

  if event
    event = Event.new(event)
    # this is the starting point for the pipeline if the event is present
    raise Exception.new('Wrong code path') if pipeline_object.present? || definition_name.nil?

    unless definition_name.constantize.trigger_when?(event)
      return
    end

    pipeline_object = Pipeline.create!(
        project: event.project,
        status: 'STARTING',
        event: event.data,
        definition: definition_name,
    )
  end

  self.pipeline = pipeline_object
  self.definition = pipeline.definition_class.new(pipeline, event)
  self.event = event || pipeline.event

  if pipeline.status?(:failed) || pipeline.status?(:complete)
    # pipeline has already been marked as failing or complete
    return
  end

  if last_stage_failed?
    # fail the pipeline if it's failing
    update_status :failed
    return
  end

  if last_stage_pending?
    # nothing to do here, still waiting on some methods to come through
    return
  end

  # if we've gotten here all the checks are passing and we can move onto the next stage

  unless next_stage.present?
    # no next stage, pipeline is complete
    complete_pipeline
    return
  end

  # run the next stage
  pipeline.update!(stage: next_stage, status: 'PENDING')
  execute(next_stage)

  unless last_stage_pending?
    # no jobs were created during the last stage, therefore everything is done!
    complete_pipeline
  end
end

#update_status(status) ⇒ Object



94
95
96
# File 'app/jobs/pipeline_runner_job.rb', line 94

def update_status(status)
  pipeline.update!(status: status.to_s.upcase)
end