Class: Gitlab::TopologyServiceClient::ConcurrencyLimitService
- Inherits:
-
Object
- Object
- Gitlab::TopologyServiceClient::ConcurrencyLimitService
- Defined in:
- lib/gitlab/topology_service_client/concurrency_limit_service.rb
Overview
This service manages a GLOBAL concurrency limit for the Topology Service. At any moment in time, there should not be more than the configured limit of concurrent requests to the Topology Service. This is a cell-wide limit that applies to ALL requests, regardless of source or type.
Uses a Redis Hash to track individual requests by process ID and request ID, similar to Sidekiq’s WorkerExecutionTracker. This approach prevents counter drift when requests fail without proper cleanup.
Configuration:
-
ApplicationSettings stores the limit value (topology_service_concurrency_limit)
-
Feature flag controls enforcement mode (log-only vs enforce)
Constant Summary collapse
- REDIS_KEY_EXECUTING =
'topology_service:concurrency_limit:executing'- TRACKING_KEY_TTL =
5.minutes
- CHECK_AND_ADD_REQUEST_SCRIPT =
"local key, request_id, limit = KEYS[1], ARGV[1], tonumber(ARGV[2])\nlocal current_time = redis.call(\"TIME\")[1]\nlocal current_count = redis.call(\"hlen\", key)\nif current_count >= limit then\n return 0\nend\nredis.call(\"hset\", key, request_id, current_time)\nreturn 1\n"
Class Method Summary collapse
-
.cleanup_stale_requests ⇒ Hash?
Clean up stale request entries that are older than TTL Called by StaleRequestsCleanupCronWorker on a schedule.
-
.concurrency_limit ⇒ Integer
Get current concurrency limit from ApplicationSettings.
-
.concurrent_request_count ⇒ Integer
The current concurrent request count.
-
.enforce_mode_enabled? ⇒ Boolean
True if enforce mode is enabled.
- .extract_method_name(grpc_method, fallback: 'unknown') ⇒ Object
-
.track_request_end(request_id) ⇒ Object
Track request end by removing the entry from the hash.
-
.track_request_start(grpc_method: nil) ⇒ String?
Track request start by atomically checking the limit and adding an entry to the hash.
Class Method Details
.cleanup_stale_requests ⇒ Hash?
Clean up stale request entries that are older than TTL Called by StaleRequestsCleanupCronWorker on a schedule
88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 88 def cleanup_stale_requests cutoff_time = TRACKING_KEY_TTL.ago.utc.to_i removed_count = 0 with_redis_suppressed_errors do |r| stale_requests = r.hgetall(REDIS_KEY_EXECUTING) .select { |_id, started_at| started_at.to_i < cutoff_time } next if stale_requests.empty? removed_count = r.hdel(REDIS_KEY_EXECUTING, stale_requests.keys) end { removed_count: removed_count } end |
.concurrency_limit ⇒ Integer
Get current concurrency limit from ApplicationSettings
41 42 43 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 41 def concurrency_limit Gitlab::CurrentSettings.topology_service_concurrency_limit end |
.concurrent_request_count ⇒ Integer
Returns The current concurrent request count.
51 52 53 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 51 def concurrent_request_count with_redis_suppressed_errors { |r| r.hlen(REDIS_KEY_EXECUTING) }.to_i end |
.enforce_mode_enabled? ⇒ Boolean
Returns true if enforce mode is enabled.
46 47 48 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 46 def enforce_mode_enabled? Feature.enabled?(:topology_service_concurrency_limit, :instance, type: :ops) end |
.extract_method_name(grpc_method, fallback: 'unknown') ⇒ Object
102 103 104 105 106 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 102 def extract_method_name(grpc_method, fallback: 'unknown') return fallback if grpc_method.blank? grpc_method.rpartition('/').last.presence || fallback end |
.track_request_end(request_id) ⇒ Object
Track request end by removing the entry from the hash
77 78 79 80 81 82 83 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 77 def track_request_end(request_id) return if request_id.nil? with_redis_suppressed_errors do |r| r.hdel(REDIS_KEY_EXECUTING, request_id) end end |
.track_request_start(grpc_method: nil) ⇒ String?
Track request start by atomically checking the limit and adding an entry to the hash. Uses a Lua script to ensure the check and increment are atomic, preventing race conditions.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 59 def track_request_start(grpc_method: nil) request_id = generate_request_id(grpc_method) result = with_redis_suppressed_errors do |r| r.eval( CHECK_AND_ADD_REQUEST_SCRIPT, keys: [REDIS_KEY_EXECUTING], argv: [request_id, concurrency_limit] ) end return request_id if result.nil? # Redis failure case, allow request result == 1 ? request_id : nil end |