Class: JetstreamBridge::OverlapGuard
- Inherits:
-
Object
- Object
- JetstreamBridge::OverlapGuard
- Defined in:
- lib/jetstream_bridge/topology/overlap_guard.rb
Overview
Checks for overlapping subjects.
Constant Summary collapse
- CACHE_TTL =
seconds
60
Class Method Summary collapse
- .allowed_subjects(jts, target_name, desired_subjects) ⇒ Object
-
.check!(jts, target_name, new_subjects) ⇒ Object
Raise if any desired subjects conflict with other streams.
-
.clear_cache! ⇒ Object
Clear the cache (useful for testing).
- .conflict_message(target, conflicts) ⇒ Object
-
.fetch_streams_uncached(jts) ⇒ Object
Fetch stream metadata without caching (for internal use).
- .js_api_request(jts, subject, payload = {}) ⇒ Object
- .list_stream_names(jts) ⇒ Object
- .list_streams_with_subjects(jts) ⇒ Object
-
.overlaps(jts, target_name, new_subjects) ⇒ Object
Return a list of conflicts against other streams, per subject.
-
.partition_allowed(jts, target_name, desired_subjects) ⇒ Object
Returns [allowed, blocked] given desired subjects.
Class Method Details
.allowed_subjects(jts, target_name, desired_subjects) ⇒ Object
50 51 52 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 50 def allowed_subjects(jts, target_name, desired_subjects) partition_allowed(jts, target_name, desired_subjects).first end |
.check!(jts, target_name, new_subjects) ⇒ Object
Raise if any desired subjects conflict with other streams.
18 19 20 21 22 23 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 18 def check!(jts, target_name, new_subjects) conflicts = overlaps(jts, target_name, new_subjects) return if conflicts.empty? raise (target_name, conflicts) end |
.clear_cache! ⇒ Object
Clear the cache (useful for testing)
86 87 88 89 90 91 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 86 def clear_cache! @cache_mutex.synchronize do @stream_cache = {} @cache_expires_at = Time.at(0) end end |
.conflict_message(target, conflicts) ⇒ Object
138 139 140 141 142 143 144 145 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 138 def (target, conflicts) msg = "Overlapping subjects for stream #{target}:\n" conflicts.each do |c| msg << "- Conflicts with '#{c[:name]}' on:\n" c[:pairs].each { |(a, b)| msg << " • #{a} × #{b}\n" } end msg end |
.fetch_streams_uncached(jts) ⇒ Object
Fetch stream metadata without caching (for internal use)
94 95 96 97 98 99 100 101 102 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 94 def fetch_streams_uncached(jts) list_stream_names(jts).map do |name| info = jts.stream_info(name) # Handle both object-style and hash-style access for compatibility config_data = info.config subjects = config_data.respond_to?(:subjects) ? config_data.subjects : config_data[:subjects] { name: name, subjects: Array(subjects || []) } end end |
.js_api_request(jts, subject, payload = {}) ⇒ Object
132 133 134 135 136 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 132 def js_api_request(jts, subject, payload = {}) # JetStream client should expose the underlying NATS client as `nc` msg = jts.nc.request(subject, Oj.dump(payload, mode: :compat)) Oj.load(msg.data, mode: :strict) end |
.list_stream_names(jts) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 104 def list_stream_names(jts) names = [] offset = 0 max_iterations = 100 # Safety limit to prevent infinite loops iterations = 0 loop do iterations += 1 if iterations > max_iterations Logging.warn( "Stream listing exceeded max iterations (#{max_iterations}), returning #{names.size} streams", tag: 'JetstreamBridge::OverlapGuard' ) break end resp = js_api_request(jts, '$JS.API.STREAM.NAMES', { offset: offset }) batch = Array(resp['streams']).filter_map { |h| h['name'] } names.concat(batch) total = resp['total'].to_i break if names.size >= total || batch.empty? offset = names.size end names end |
.list_streams_with_subjects(jts) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 54 def list_streams_with_subjects(jts) # Use cached data if available and fresh @cache_mutex.synchronize do now = Time.now if now < @cache_expires_at && @stream_cache.key?(:data) Logging.debug( 'Using cached stream metadata', tag: 'JetstreamBridge::OverlapGuard' ) return @stream_cache[:data] end # Fetch fresh data Logging.debug( 'Fetching fresh stream metadata from NATS', tag: 'JetstreamBridge::OverlapGuard' ) result = fetch_streams_uncached(jts) @stream_cache = { data: result } @cache_expires_at = now + CACHE_TTL result end rescue StandardError => e Logging.warn( "Failed to fetch stream metadata: #{e.class} #{e.message}", tag: 'JetstreamBridge::OverlapGuard' ) # Return cached data on error if available, otherwise empty array @cache_mutex.synchronize { @stream_cache[:data] || [] } end |
.overlaps(jts, target_name, new_subjects) ⇒ Object
Return a list of conflicts against other streams, per subject.
- { name:‘OTHER’ pairs: [[‘a.b.*’, ‘a.b.c’], …
-
}, …]
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 27 def overlaps(jts, target_name, new_subjects) desired = Array(new_subjects).map!(&:to_s).uniq streams = list_streams_with_subjects(jts) others = streams.reject { |s| s[:name] == target_name } others.filter_map do |s| pairs = desired.flat_map do |n| Array(s[:subjects]).map(&:to_s).select { |e| SubjectMatcher.overlap?(n, e) } .map { |e| [n, e] } end { name: s[:name], pairs: pairs } unless pairs.empty? end end |
.partition_allowed(jts, target_name, desired_subjects) ⇒ Object
Returns [allowed, blocked] given desired subjects.
42 43 44 45 46 47 48 |
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 42 def partition_allowed(jts, target_name, desired_subjects) desired = Array(desired_subjects).map!(&:to_s).uniq conflicts = overlaps(jts, target_name, desired) blocked = conflicts.flat_map { |c| c[:pairs].map(&:first) }.uniq allowed = desired - blocked [allowed, blocked] end |