Class: Gitlab::TopologyServiceClient::ConcurrencyLimitService

Inherits:
Object
  • Object
show all
Defined in:
lib/gitlab/topology_service_client/concurrency_limit_service.rb

Overview

This service manages a GLOBAL concurrency limit for the Topology Service. At any moment in time, there should not be more than the configured limit of concurrent requests to the Topology Service. This is a cell-wide limit that applies to ALL requests, regardless of source or type.

Uses a Redis Hash to track individual requests by process ID and request ID, similar to Sidekiq’s WorkerExecutionTracker. This approach prevents counter drift when requests fail without proper cleanup.

Configuration:

  • ApplicationSettings stores the limit value (topology_service_concurrency_limit)

  • Feature flag controls enforcement mode (log-only vs enforce)

Constant Summary collapse

REDIS_KEY_EXECUTING =
'topology_service:concurrency_limit:executing'
TRACKING_KEY_TTL =
5.minutes
CHECK_AND_ADD_REQUEST_SCRIPT =

Lua script for atomically checking the concurrency limit and adding a request. Returns 1 if the request was added (limit not exceeded), 0 if rejected. Uses Redis server time for consistency with cleanup operations. KEYS: REDIS_KEY_EXECUTING (hash key) ARGV: request_id ARGV: concurrency_limit

"local key, request_id, limit = KEYS[1], ARGV[1], tonumber(ARGV[2])\nlocal current_time = redis.call(\"TIME\")[1]\nlocal current_count = redis.call(\"hlen\", key)\nif current_count >= limit then\n  return 0\nend\nredis.call(\"hset\", key, request_id, current_time)\nreturn 1\n"

Class Method Summary collapse

Class Method Details

.cleanup_stale_requestsHash?

Clean up stale request entries that are older than TTL Called by StaleRequestsCleanupCronWorker on a schedule

Returns:

  • (Hash, nil)

    Hash with :removed_count on success, nil if Redis unavailable



88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 88

def cleanup_stale_requests
  cutoff_time = TRACKING_KEY_TTL.ago.utc.to_i
  removed_count = 0

  with_redis_suppressed_errors do |r|
    stale_requests = r.hgetall(REDIS_KEY_EXECUTING)
                      .select { |_id, started_at| started_at.to_i < cutoff_time }
    next if stale_requests.empty?

    removed_count = r.hdel(REDIS_KEY_EXECUTING, stale_requests.keys)
  end
  { removed_count: removed_count }
end

.concurrency_limitInteger

Get current concurrency limit from ApplicationSettings

Returns:

  • (Integer)

    The current limit



41
42
43
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 41

def concurrency_limit
  Gitlab::CurrentSettings.topology_service_concurrency_limit
end

.concurrent_request_countInteger

Returns The current concurrent request count.

Returns:

  • (Integer)

    The current concurrent request count



51
52
53
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 51

def concurrent_request_count
  with_redis_suppressed_errors { |r| r.hlen(REDIS_KEY_EXECUTING) }.to_i
end

.enforce_mode_enabled?Boolean

Returns true if enforce mode is enabled.

Returns:

  • (Boolean)

    true if enforce mode is enabled



46
47
48
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 46

def enforce_mode_enabled?
  Feature.enabled?(:topology_service_concurrency_limit, :instance, type: :ops)
end

.extract_method_name(grpc_method, fallback: 'unknown') ⇒ Object



102
103
104
105
106
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 102

def extract_method_name(grpc_method, fallback: 'unknown')
  return fallback if grpc_method.blank?

  grpc_method.rpartition('/').last.presence || fallback
end

.track_request_end(request_id) ⇒ Object

Track request end by removing the entry from the hash

Parameters:

  • request_id (String)

    The request ID returned by track_request_start



77
78
79
80
81
82
83
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 77

def track_request_end(request_id)
  return if request_id.nil?

  with_redis_suppressed_errors do |r|
    r.hdel(REDIS_KEY_EXECUTING, request_id)
  end
end

.track_request_start(grpc_method: nil) ⇒ String?

Track request start by atomically checking the limit and adding an entry to the hash. Uses a Lua script to ensure the check and increment are atomic, preventing race conditions.

Parameters:

  • grpc_method (String) (defaults to: nil)

    The gRPC method being called (e.g., ‘/TopologyService/GetCell’)

Returns:

  • (String, nil)

    The unique request ID if limit not exceeded, nil if rejected



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/gitlab/topology_service_client/concurrency_limit_service.rb', line 59

def track_request_start(grpc_method: nil)
  request_id = generate_request_id(grpc_method)

  result = with_redis_suppressed_errors do |r|
    r.eval(
      CHECK_AND_ADD_REQUEST_SCRIPT,
      keys: [REDIS_KEY_EXECUTING],
      argv: [request_id, concurrency_limit]
    )
  end

  return request_id if result.nil? # Redis failure case, allow request

  result == 1 ? request_id : nil
end