Class: ClickHouse::EventsSyncWorker
- Inherits:
-
Object
- Object
- ClickHouse::EventsSyncWorker
- Includes:
- ApplicationWorker, Gitlab::ExclusiveLeaseHelpers
- Defined in:
- app/workers/click_house/events_sync_worker.rb
Constant Summary collapse
- MAX_TTL =
the job is scheduled every 3 minutes and we will allow maximum 2.5 minutes runtime
2.5.minutes.to_i
- MAX_RUNTIME =
120.seconds
- BATCH_SIZE =
500
- INSERT_BATCH_SIZE =
5000
- CSV_MAPPING =
{ id: :id, path: :path, author_id: :author_id, target_id: :target_id, target_type: :target_type, action: :raw_action, created_at: :casted_created_at, updated_at: :casted_updated_at }.freeze
- PATH_COLUMN =
transforms the traversal_ids to a String: Example: group_id/subgroup_id/group_or_projectnamespace_id/
<<~SQL ( CASE WHEN project_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = (SELECT project_namespace_id FROM projects WHERE id = events.project_id LIMIT 1) LIMIT 1) WHEN group_id IS NOT NULL THEN (SELECT array_to_string(traversal_ids, '/') || '/' FROM namespaces WHERE id = events.group_id LIMIT 1) ELSE '' END ) AS path SQL
- EVENT_PROJECTIONS =
[ :id, PATH_COLUMN, :author_id, :target_id, :target_type, 'action AS raw_action', 'EXTRACT(epoch FROM created_at) AS casted_created_at', 'EXTRACT(epoch FROM updated_at) AS casted_updated_at' ].freeze
- INSERT_EVENTS_QUERY =
<<~SQL.squish INSERT INTO events (#{CSV_MAPPING.keys.join(', ')}) SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT CSV SQL
Constants included from Gitlab::ExclusiveLeaseHelpers
Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
Constants included from ApplicationWorker
ApplicationWorker::LOGGING_EXTRA_KEY, ApplicationWorker::SAFE_PUSH_BULK_LIMIT
Constants included from Gitlab::Loggable
Constants included from WorkerAttributes
WorkerAttributes::DEFAULT_DATA_CONSISTENCY, WorkerAttributes::DEFAULT_DEFER_DELAY, WorkerAttributes::NAMESPACE_WEIGHTS, WorkerAttributes::VALID_DATA_CONSISTENCIES, WorkerAttributes::VALID_RESOURCE_BOUNDARIES, WorkerAttributes::VALID_URGENCIES
Instance Method Summary collapse
Methods included from Gitlab::ExclusiveLeaseHelpers
Methods included from Gitlab::Loggable
Methods included from Gitlab::SidekiqVersioning::Worker
Methods included from WorkerContext
Instance Method Details
#perform ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'app/workers/click_house/events_sync_worker.rb', line 57 def perform unless enabled? (:result, { status: :disabled }) return end = { status: :processed } begin # Prevent parallel jobs in_lock(self.class.to_s, ttl: MAX_TTL, retries: 0) do loop { break unless next_batch } .merge!(records_inserted: context.total_record_count, reached_end_of_table: context.no_more_records?) ClickHouse::SyncCursor.update_cursor_for(:events, context.last_processed_id) if context.last_processed_id end rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError # Skip retrying, just let the next worker to start after a few minutes = { status: :skipped } end (:result, ) end |