Class: PipelineRunnerJob
- Inherits:
-
ApplicationJob
- Object
- ActiveJob::Base
- ApplicationJob
- PipelineRunnerJob
- Defined in:
- app/jobs/pipeline_runner_job.rb
Instance Attribute Summary collapse
-
#definition ⇒ Object
Returns the value of attribute definition.
-
#event ⇒ Object
Returns the value of attribute event.
-
#pipeline ⇒ Object
Returns the value of attribute pipeline.
Instance Method Summary collapse
- #complete_pipeline ⇒ Object
- #execute(this_stage) ⇒ Object
- #last_stage_failed? ⇒ Boolean
- #last_stage_pending? ⇒ Boolean
- #next_stage ⇒ Object
-
#perform(pipeline_object, event = nil, definition_name = nil) ⇒ Object
receives a pipeline as it’s argument.
- #update_status(status) ⇒ Object
Instance Attribute Details
#definition ⇒ Object
Returns the value of attribute definition.
2 3 4 |
# File 'app/jobs/pipeline_runner_job.rb', line 2 def definition @definition end |
#event ⇒ Object
Returns the value of attribute event.
2 3 4 |
# File 'app/jobs/pipeline_runner_job.rb', line 2 def event @event end |
#pipeline ⇒ Object
Returns the value of attribute pipeline.
2 3 4 |
# File 'app/jobs/pipeline_runner_job.rb', line 2 def pipeline @pipeline end |
Instance Method Details
#complete_pipeline ⇒ Object
75 76 77 78 79 80 81 |
# File 'app/jobs/pipeline_runner_job.rb', line 75 def complete_pipeline definition.class.ensure_stages.each do |ensured_stage| # will fail the job if an error is raised execute(ensured_stage) end update_status :complete end |
#execute(this_stage) ⇒ Object
64 65 66 67 68 69 70 71 72 73 |
# File 'app/jobs/pipeline_runner_job.rb', line 64 def execute(this_stage) begin definition.send(this_stage) rescue Exception => e pipeline.update!(status: 'ERROR', error: e.) raise e ensure pipeline.save end end |
#last_stage_failed? ⇒ Boolean
98 99 100 |
# File 'app/jobs/pipeline_runner_job.rb', line 98 def last_stage_failed? Job.where('pipeline_id = ? AND pipeline_stage = ? AND (status = "FAILED" OR status = "CANCELLED")', pipeline.id, pipeline.stage).exists? end |
#last_stage_pending? ⇒ Boolean
102 103 104 |
# File 'app/jobs/pipeline_runner_job.rb', line 102 def last_stage_pending? Job.where('pipeline_id = ? AND pipeline_stage = ? AND (status = "QUEUED" OR status = "PENDING")', pipeline.id, pipeline.stage).exists? end |
#next_stage ⇒ Object
83 84 85 86 87 88 89 90 91 92 |
# File 'app/jobs/pipeline_runner_job.rb', line 83 def next_stage @next_stage ||= begin if pipeline.stage cur_idx = definition.class.stages.find_index { |st| st == pipeline.stage.downcase.to_sym } definition.class.stages[cur_idx + 1] else definition.class.stages[0] end end end |
#perform(pipeline_object, event = nil, definition_name = nil) ⇒ Object
receives a pipeline as it’s argument
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'app/jobs/pipeline_runner_job.rb', line 7 def perform(pipeline_object, event=nil, definition_name=nil) if event event = Event.new(event) # this is the starting point for the pipeline if the event is present raise Exception.new('Wrong code path') if pipeline_object.present? || definition_name.nil? unless definition_name.constantize.trigger_when?(event) return end pipeline_object = Pipeline.create!( project: event.project, status: 'STARTING', event: event.data, definition: definition_name, ) end self.pipeline = pipeline_object self.definition = pipeline.definition_class.new(pipeline, event) self.event = event || pipeline.event if pipeline.status?(:failed) || pipeline.status?(:complete) # pipeline has already been marked as failing or complete return end if last_stage_failed? # fail the pipeline if it's failing update_status :failed return end if last_stage_pending? # nothing to do here, still waiting on some methods to come through return end # if we've gotten here all the checks are passing and we can move onto the next stage unless next_stage.present? # no next stage, pipeline is complete complete_pipeline return end # run the next stage pipeline.update!(stage: next_stage, status: 'PENDING') execute(next_stage) unless last_stage_pending? # no jobs were created during the last stage, therefore everything is done! complete_pipeline end end |
#update_status(status) ⇒ Object
94 95 96 |
# File 'app/jobs/pipeline_runner_job.rb', line 94 def update_status(status) pipeline.update!(status: status.to_s.upcase) end |