Module: Gitlab::Import::AdvanceStage

Constant Summary collapse

INTERVAL =
30.seconds.to_i
TIMEOUT_DURATION =
2.hours
AdvanceStageTimeoutError =
Class.new(StandardError)
BLOCKING_WAIT_TIME =

The number of seconds to wait (while blocking the thread) before continuing to the next waiter.

5

Instance Method Summary collapse

Instance Method Details

#find_import_state(id) ⇒ Object

Raises:

  • (NotImplementedError)


72
73
74
# File 'app/workers/gitlab/import/advance_stage.rb', line 72

def find_import_state(id)
  raise NotImplementedError
end

#find_import_state_jid(project_id) ⇒ Object

Raises:

  • (NotImplementedError)


68
69
70
# File 'app/workers/gitlab/import/advance_stage.rb', line 68

def find_import_state_jid(project_id)
  raise NotImplementedError
end

#perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil) ⇒ Object

project_id - The ID of the project being imported. waiters - A Hash mapping Gitlab::JobWaiter keys to the number of

remaining jobs.

next_stage - The name of the next stage to start when all jobs have been

completed.

timeout_timer - Time the sidekiq worker was first initiated with the current job_count previous_job_count - Number of jobs remaining on last invocation of this worker



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'app/workers/gitlab/import/advance_stage.rb', line 22

def perform(project_id, waiters, next_stage, timeout_timer = Time.zone.now, previous_job_count = nil)
  import_state_jid = find_import_state_jid(project_id)

  # If the import state is nil the project may have been deleted or the import
  # may have failed or been canceled. In this case we tidy up the cache data and no
  # longer attempt to advance to the next stage.
  if import_state_jid.nil?
    clear_waiter_caches(waiters)
    return
  end

  project = Project.find_by_id(project_id)
  new_waiters = wait_for_jobs(waiters)
  new_job_count = new_waiters.values.sum

  # Reset the timeout timer as some jobs finished processing
  if new_job_count != previous_job_count
    timeout_timer = Time.zone.now
    previous_job_count = new_job_count
  end

  if new_waiters.empty?
    proceed_to_next_stage(import_state_jid, next_stage, project_id)
  elsif Feature.enabled?(:advance_stage_timeout, project) && timeout_reached?(timeout_timer) &&
      new_job_count == previous_job_count

    handle_timeout(import_state_jid, next_stage, project_id, new_waiters, new_job_count)
  else
    self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage, timeout_timer, previous_job_count)
  end
end

#wait_for_jobs(waiters) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'app/workers/gitlab/import/advance_stage.rb', line 54

def wait_for_jobs(waiters)
  waiters.each_with_object({}) do |(key, remaining), new_waiters|
    waiter = JobWaiter.new(remaining, key)

    # We wait for a brief moment of time so we don't reschedule if we can
    # complete the work fast enough.
    waiter.wait(BLOCKING_WAIT_TIME)

    next unless waiter.jobs_remaining > 0

    new_waiters[waiter.key] = waiter.jobs_remaining
  end
end