Module: Gitlab::Import::AdvanceStage

Included in:
GithubImport::AdvanceStageWorker, JiraImport::AdvanceStageWorker
Defined in:
app/workers/gitlab/import/advance_stage.rb

Constant Summary collapse

INTERVAL =
30.seconds.to_i
BLOCKING_WAIT_TIME =

The number of seconds to wait (while blocking the thread) before continuing to the next waiter.

5

Instance Method Summary collapse

Instance Method Details

#find_import_state(project_id) ⇒ Object

Raises:

  • (NotImplementedError)

50
51
52
# File 'app/workers/gitlab/import/advance_stage.rb', line 50

def find_import_state(project_id)
  raise NotImplementedError
end

#perform(project_id, waiters, next_stage) ⇒ Object

project_id - The ID of the project being imported. waiters - A Hash mapping Gitlab::JobWaiter keys to the number of

remaining jobs.

next_stage - The name of the next stage to start when all jobs have been

completed.

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'app/workers/gitlab/import/advance_stage.rb', line 17

def perform(project_id, waiters, next_stage)
  return unless import_state = find_import_state(project_id)

  new_waiters = wait_for_jobs(waiters)

  if new_waiters.empty?
    # We refresh the import JID here so workers importing individual
    # resources (e.g. notes) don't have to do this all the time, reducing
    # the pressure on Redis. We _only_ do this once all jobs are done so
    # we don't get stuck forever if one or more jobs failed to notify the
    # JobWaiter.
    import_state.refresh_jid_expiration

    next_stage_worker(next_stage).perform_async(project_id)
  else
    self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage)
  end
end

#wait_for_jobs(waiters) ⇒ Object


36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'app/workers/gitlab/import/advance_stage.rb', line 36

def wait_for_jobs(waiters)
  waiters.each_with_object({}) do |(key, remaining), new_waiters|
    waiter = JobWaiter.new(remaining, key)

    # We wait for a brief moment of time so we don't reschedule if we can
    # complete the work fast enough.
    waiter.wait(BLOCKING_WAIT_TIME)

    next unless waiter.jobs_remaining > 0

    new_waiters[waiter.key] = waiter.jobs_remaining
  end
end