Module: JetstreamBridge::StreamSupport

Defined in:
lib/jetstream_bridge/topology/stream.rb

Class Method Summary collapse

Class Method Details

.log_all_blocked(name, blocked) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/jetstream_bridge/topology/stream.rb', line 37

def log_all_blocked(name, blocked)
  if blocked.any?
    Logging.warn(
      "Stream #{name}: all missing subjects belong to other streams; unchanged. blocked=#{blocked.inspect}",
      tag: 'JetstreamBridge::Stream'
    )
  else
    Logging.info("Stream #{name} exists; nothing to add.", tag: 'JetstreamBridge::Stream')
  end
end

.log_already_covered(name) ⇒ Object

—- Logging —-



30
31
32
33
34
35
# File 'lib/jetstream_bridge/topology/stream.rb', line 30

def log_already_covered(name)
  Logging.info(
    "Stream #{name} exists; subjects and config already covered.",
    tag: 'JetstreamBridge::Stream'
  )
end

.log_config_updated(name, storage:) ⇒ Object



72
73
74
75
76
77
# File 'lib/jetstream_bridge/topology/stream.rb', line 72

def log_config_updated(name, storage:)
  Logging.info(
    "Updated stream #{name} config; storage=#{storage.inspect}",
    tag: 'JetstreamBridge::Stream'
  )
end

.log_created(name, allowed, blocked, retention, storage) ⇒ Object



61
62
63
64
65
66
67
68
69
70
# File 'lib/jetstream_bridge/topology/stream.rb', line 61

def log_created(name, allowed, blocked, retention, storage)
  msg = [
    "Created stream #{name}",
    "subjects=#{allowed.inspect}",
    "retention=#{retention.inspect}",
    "storage=#{storage.inspect}"
  ].join(' ')
  msg += " (skipped overlapped=#{blocked.inspect})" if blocked.any?
  Logging.info(msg, tag: 'JetstreamBridge::Stream')
end

.log_not_created(name, blocked) ⇒ Object



54
55
56
57
58
59
# File 'lib/jetstream_bridge/topology/stream.rb', line 54

def log_not_created(name, blocked)
  Logging.warn(
    "Not creating stream #{name}: all desired subjects belong to other streams. blocked=#{blocked.inspect}",
    tag: 'JetstreamBridge::Stream'
  )
end

.log_retention_mismatch(name, have:, want:) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/jetstream_bridge/topology/stream.rb', line 79

def log_retention_mismatch(name, have:, want:)
  Logging.warn(
    "Stream #{name} retention mismatch (have=#{have.inspect}, want=#{want.inspect}). " \
    'Retention is immutable; skipping retention change.',
    tag: 'JetstreamBridge::Stream'
  )
end

.log_updated(name, added, blocked) ⇒ Object



48
49
50
51
52
# File 'lib/jetstream_bridge/topology/stream.rb', line 48

def log_updated(name, added, blocked)
  msg = "Updated stream #{name}; added subjects=#{added.inspect}"
  msg += " (skipped overlapped=#{blocked.inspect})" if blocked.any?
  Logging.info(msg, tag: 'JetstreamBridge::Stream')
end

.missing_subjects(existing, desired) ⇒ Object



15
16
17
# File 'lib/jetstream_bridge/topology/stream.rb', line 15

def missing_subjects(existing, desired)
  desired.reject { |d| SubjectMatcher.covered?(existing, d) }
end

.normalize_subjects(list) ⇒ Object



11
12
13
# File 'lib/jetstream_bridge/topology/stream.rb', line 11

def normalize_subjects(list)
  Array(list).flatten.compact.map!(&:to_s).reject(&:empty?).uniq
end

.overlap_error?(error) ⇒ Boolean

Returns:

  • (Boolean)


24
25
26
27
# File 'lib/jetstream_bridge/topology/stream.rb', line 24

def overlap_error?(error)
  msg = error.message.to_s
  msg =~ /subjects?\s+overlap/i || msg =~ /\berr_code=10065\b/ || msg =~ /\b400\b/
end

.stream_not_found?(error) ⇒ Boolean

Returns:

  • (Boolean)


19
20
21
22
# File 'lib/jetstream_bridge/topology/stream.rb', line 19

def stream_not_found?(error)
  msg = error.message.to_s
  msg =~ /stream\s+not\s+found/i || msg =~ /\b404\b/
end