Class: JetstreamBridge::OverlapGuard

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/topology/overlap_guard.rb

Overview

Checks for overlapping subjects.

Constant Summary collapse

CACHE_TTL =

seconds

60

Class Method Summary collapse

Class Method Details

.allowed_subjects(jts, target_name, desired_subjects) ⇒ Object



50
51
52
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 50

def allowed_subjects(jts, target_name, desired_subjects)
  partition_allowed(jts, target_name, desired_subjects).first
end

.check!(jts, target_name, new_subjects) ⇒ Object

Raise if any desired subjects conflict with other streams.



18
19
20
21
22
23
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 18

def check!(jts, target_name, new_subjects)
  conflicts = overlaps(jts, target_name, new_subjects)
  return if conflicts.empty?

  raise conflict_message(target_name, conflicts)
end

.clear_cache!Object

Clear the cache (useful for testing)



86
87
88
89
90
91
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 86

def clear_cache!
  @cache_mutex.synchronize do
    @stream_cache = {}
    @cache_expires_at = Time.at(0)
  end
end

.conflict_message(target, conflicts) ⇒ Object



138
139
140
141
142
143
144
145
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 138

def conflict_message(target, conflicts)
  msg = "Overlapping subjects for stream #{target}:\n"
  conflicts.each do |c|
    msg << "- Conflicts with '#{c[:name]}' on:\n"
    c[:pairs].each { |(a, b)| msg << "    • #{a} × #{b}\n" }
  end
  msg
end

.fetch_streams_uncached(jts) ⇒ Object

Fetch stream metadata without caching (for internal use)



94
95
96
97
98
99
100
101
102
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 94

def fetch_streams_uncached(jts)
  list_stream_names(jts).map do |name|
    info = jts.stream_info(name)
    # Handle both object-style and hash-style access for compatibility
    config_data = info.config
    subjects = config_data.respond_to?(:subjects) ? config_data.subjects : config_data[:subjects]
    { name: name, subjects: Array(subjects || []) }
  end
end

.js_api_request(jts, subject, payload = {}) ⇒ Object



132
133
134
135
136
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 132

def js_api_request(jts, subject, payload = {})
  # JetStream client should expose the underlying NATS client as `nc`
  msg = jts.nc.request(subject, Oj.dump(payload, mode: :compat))
  Oj.load(msg.data, mode: :strict)
end

.list_stream_names(jts) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 104

def list_stream_names(jts)
  names  = []
  offset = 0
  max_iterations = 100 # Safety limit to prevent infinite loops
  iterations = 0

  loop do
    iterations += 1
    if iterations > max_iterations
      Logging.warn(
        "Stream listing exceeded max iterations (#{max_iterations}), returning #{names.size} streams",
        tag: 'JetstreamBridge::OverlapGuard'
      )
      break
    end

    resp  = js_api_request(jts, '$JS.API.STREAM.NAMES', { offset: offset })
    batch = Array(resp['streams']).filter_map { |h| h['name'] }
    names.concat(batch)
    total = resp['total'].to_i

    break if names.size >= total || batch.empty?

    offset = names.size
  end
  names
end

.list_streams_with_subjects(jts) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 54

def list_streams_with_subjects(jts)
  # Use cached data if available and fresh
  @cache_mutex.synchronize do
    now = Time.now
    if now < @cache_expires_at && @stream_cache.key?(:data)
      Logging.debug(
        'Using cached stream metadata',
        tag: 'JetstreamBridge::OverlapGuard'
      )
      return @stream_cache[:data]
    end

    # Fetch fresh data
    Logging.debug(
      'Fetching fresh stream metadata from NATS',
      tag: 'JetstreamBridge::OverlapGuard'
    )
    result = fetch_streams_uncached(jts)
    @stream_cache = { data: result }
    @cache_expires_at = now + CACHE_TTL
    result
  end
rescue StandardError => e
  Logging.warn(
    "Failed to fetch stream metadata: #{e.class} #{e.message}",
    tag: 'JetstreamBridge::OverlapGuard'
  )
  # Return cached data on error if available, otherwise empty array
  @cache_mutex.synchronize { @stream_cache[:data] || [] }
end

.overlaps(jts, target_name, new_subjects) ⇒ Object

Return a list of conflicts against other streams, per subject.

{ name:‘OTHER’ pairs: [[‘a.b.*’, ‘a.b.c’], …

}, …]



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 27

def overlaps(jts, target_name, new_subjects)
  desired = Array(new_subjects).map!(&:to_s).uniq
  streams = list_streams_with_subjects(jts)
  others  = streams.reject { |s| s[:name] == target_name }

  others.filter_map do |s|
    pairs = desired.flat_map do |n|
      Array(s[:subjects]).map(&:to_s).select { |e| SubjectMatcher.overlap?(n, e) }
                         .map { |e| [n, e] }
    end
    { name: s[:name], pairs: pairs } unless pairs.empty?
  end
end

.partition_allowed(jts, target_name, desired_subjects) ⇒ Object

Returns [allowed, blocked] given desired subjects.



42
43
44
45
46
47
48
# File 'lib/jetstream_bridge/topology/overlap_guard.rb', line 42

def partition_allowed(jts, target_name, desired_subjects)
  desired   = Array(desired_subjects).map!(&:to_s).uniq
  conflicts = overlaps(jts, target_name, desired)
  blocked   = conflicts.flat_map { |c| c[:pairs].map(&:first) }.uniq
  allowed   = desired - blocked
  [allowed, blocked]
end