Module: Gitlab::Import::AdvanceStage
- Included in:
- BitbucketImport::AdvanceStageWorker, BitbucketServerImport::AdvanceStageWorker, GithubImport::AdvanceStageWorker, JiraImport::AdvanceStageWorker
- Defined in:
- app/workers/gitlab/import/advance_stage.rb
Constant Summary collapse
- INTERVAL =
30.seconds.to_i
- TIMEOUT_DURATION =
2.hours
- AdvanceStageTimeoutError =
Class.new(StandardError)
- BLOCKING_WAIT_TIME =
The number of seconds to wait (while blocking the thread) before continuing to the next waiter.
5
Instance Method Summary collapse
- #find_import_state(id) ⇒ Object
- #find_import_state_jid(project_id) ⇒ Object
-
#perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) ⇒ Object
project_id - The ID of the project being imported.
- #wait_for_jobs(waiters) ⇒ Object
Instance Method Details
#find_import_state(id) ⇒ Object
72 73 74 |
# File 'app/workers/gitlab/import/advance_stage.rb', line 72 def find_import_state(id) raise NotImplementedError end |
#find_import_state_jid(project_id) ⇒ Object
68 69 70 |
# File 'app/workers/gitlab/import/advance_stage.rb', line 68 def find_import_state_jid(project_id) raise NotImplementedError end |
#perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) ⇒ Object
project_id - The ID of the project being imported. waiters - A Hash mapping Gitlab::JobWaiter keys to the number of
remaining jobs.
next_stage - The name of the next stage to start when all jobs have been
completed.
timeout_timer - Time the sidekiq worker was first initiated with the current job_count previous_job_count - Number of jobs remaining on last invocation of this worker
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'app/workers/gitlab/import/advance_stage.rb', line 22 def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) import_state_jid = find_import_state_jid(project_id) # If the import state is nil the project may have been deleted or the import # may have failed or been canceled. In this case we tidy up the cache data and no # longer attempt to advance to the next stage. if import_state_jid.nil? clear_waiter_caches(waiters) return end project = Project.find_by_id(project_id) new_waiters = wait_for_jobs(waiters) new_job_count = new_waiters.values.sum # Reset the timeout timer as some jobs finished processing if new_job_count != previous_job_count timeout_timer = Time.zone.now previous_job_count = new_job_count end if new_waiters.empty? proceed_to_next_stage(import_state_jid, next_stage, project_id) elsif Feature.enabled?(:advance_stage_timeout, project) && timeout_reached?(timeout_timer) && new_job_count == previous_job_count handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count) else self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count) end end |
#wait_for_jobs(waiters) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'app/workers/gitlab/import/advance_stage.rb', line 54 def wait_for_jobs(waiters) waiters.each_with_object({}) do |(key, remaining), new_waiters| waiter = JobWaiter.new(remaining, key) # We wait for a brief moment of time so we don't reschedule if we can # complete the work fast enough. waiter.wait(BLOCKING_WAIT_TIME) next unless waiter.jobs_remaining > 0 new_waiters[waiter.key] = waiter.jobs_remaining end end |