Class: ClickHouse::EventsSyncWorker

Inherits:
Object
  • Object
show all
Includes:
ApplicationWorker, Gitlab::ExclusiveLeaseHelpers
Defined in:
app/workers/click_house/events_sync_worker.rb

Constant Summary collapse

MAX_TTL =

the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime

2.5.minutes.to_i
MAX_RUNTIME =
120.seconds
BATCH_SIZE =
500
INSERT_BATCH_SIZE =
5000
CSV_MAPPING =
{
  id: :id,
  path: :path,
  author_id: :author_id,
  target_id: :target_id,
  target_type: :target_type,
  action: :raw_action,
  created_at: :casted_created_at,
  updated_at: :casted_updated_at
}.freeze
PATH_COLUMN =

transforms the traversal_ids to a String: Example: group_id/subgroup_id/group_or_projectnamespace_id/

<<~SQL
  (
    CASE
      WHEN project_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = (SELECT project_namespace_id FROM projects WHERE id = events.project_id LIMIT 1) LIMIT 1)
      WHEN group_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = events.group_id LIMIT 1)
      ELSE ''
    END
  ) AS path
SQL
EVENT_PROJECTIONS =
[
  :id,
  PATH_COLUMN,
  :author_id,
  :target_id,
  :target_type,
  'action AS raw_action',
  'EXTRACT(epoch FROM created_at) AS casted_created_at',
  'EXTRACT(epoch FROM updated_at) AS casted_updated_at'
].freeze
INSERT_EVENTS_QUERY =
<<~SQL.squish
  INSERT INTO events (#{CSV_MAPPING.keys.join(', ')})
  SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV
SQL

Constants included from Gitlab::ExclusiveLeaseHelpers

Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError

Constants included from ApplicationWorker

ApplicationWorker::LOGGING_EXTRA_KEY, ApplicationWorker::SAFE_PUSH_BULK_LIMIT

Constants included from Gitlab::Loggable

Gitlab::Loggable::ANONYMOUS

Constants included from WorkerAttributes

WorkerAttributes::DEFAULT_DATA_CONSISTENCY, WorkerAttributes::DEFAULT_DEFER_DELAY, WorkerAttributes::NAMESPACE_WEIGHTS, WorkerAttributes::VALID_DATA_CONSISTENCIES, WorkerAttributes::VALID_RESOURCE_BOUNDARIES, WorkerAttributes::VALID_URGENCIES

Instance Method Summary collapse

Methods included from Gitlab::ExclusiveLeaseHelpers

#in_lock

Methods included from Gitlab::Loggable

#build_structured_payload

Methods included from Gitlab::SidekiqVersioning::Worker

#job_version

Methods included from WorkerContext

#with_context

Instance Method Details

#performObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'app/workers/click_house/events_sync_worker.rb', line 57

def perform
  unless enabled?
    (:result, { status: :disabled })

    return
  end

   = { status: :processed }

  begin
    # Prevent parallel jobs
    in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do
      loop { break unless next_batch }

      .merge!(records_inserted: context.total_record_count, reached_end_of_table: context.no_more_records?)

      ClickHouse::SyncCursor.update_cursor_for(:events, context.last_processed_id) if context.last_processed_id
    end
  rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
    # Skip retrying, just let the next worker to start after a few minutes
     = { status: :skipped }
  end

  (:result, )
end