Module: JetstreamBridge::StreamSupport
- Defined in:
- lib/jetstream_bridge/topology/stream.rb
Class Method Summary collapse
- .log_all_blocked(name, blocked) ⇒ Object
-
.log_already_covered(name) ⇒ Object
—- Logging —-.
- .log_config_updated(name, storage:) ⇒ Object
- .log_created(name, allowed, blocked, retention, storage) ⇒ Object
- .log_not_created(name, blocked) ⇒ Object
- .log_retention_mismatch(name, have:, want:) ⇒ Object
- .log_updated(name, added, blocked) ⇒ Object
- .missing_subjects(existing, desired) ⇒ Object
- .normalize_subjects(list) ⇒ Object
- .overlap_error?(error) ⇒ Boolean
- .stream_not_found?(error) ⇒ Boolean
Class Method Details
.log_all_blocked(name, blocked) ⇒ Object
37 38 39 40 41 42 43 44 45 46 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 37 def log_all_blocked(name, blocked) if blocked.any? Logging.warn( "Stream #{name}: all missing subjects belong to other streams; unchanged. blocked=#{blocked.inspect}", tag: 'JetstreamBridge::Stream' ) else Logging.info("Stream #{name} exists; nothing to add.", tag: 'JetstreamBridge::Stream') end end |
.log_already_covered(name) ⇒ Object
—- Logging —-
30 31 32 33 34 35 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 30 def log_already_covered(name) Logging.info( "Stream #{name} exists; subjects and config already covered.", tag: 'JetstreamBridge::Stream' ) end |
.log_config_updated(name, storage:) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 72 def log_config_updated(name, storage:) Logging.info( "Updated stream #{name} config; storage=#{storage.inspect}", tag: 'JetstreamBridge::Stream' ) end |
.log_created(name, allowed, blocked, retention, storage) ⇒ Object
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 61 def log_created(name, allowed, blocked, retention, storage) msg = [ "Created stream #{name}", "subjects=#{allowed.inspect}", "retention=#{retention.inspect}", "storage=#{storage.inspect}" ].join(' ') msg += " (skipped overlapped=#{blocked.inspect})" if blocked.any? Logging.info(msg, tag: 'JetstreamBridge::Stream') end |
.log_not_created(name, blocked) ⇒ Object
54 55 56 57 58 59 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 54 def log_not_created(name, blocked) Logging.warn( "Not creating stream #{name}: all desired subjects belong to other streams. blocked=#{blocked.inspect}", tag: 'JetstreamBridge::Stream' ) end |
.log_retention_mismatch(name, have:, want:) ⇒ Object
79 80 81 82 83 84 85 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 79 def log_retention_mismatch(name, have:, want:) Logging.warn( "Stream #{name} retention mismatch (have=#{have.inspect}, want=#{want.inspect}). " \ 'Retention is immutable; skipping retention change.', tag: 'JetstreamBridge::Stream' ) end |
.log_updated(name, added, blocked) ⇒ Object
48 49 50 51 52 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 48 def log_updated(name, added, blocked) msg = "Updated stream #{name}; added subjects=#{added.inspect}" msg += " (skipped overlapped=#{blocked.inspect})" if blocked.any? Logging.info(msg, tag: 'JetstreamBridge::Stream') end |
.missing_subjects(existing, desired) ⇒ Object
15 16 17 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 15 def missing_subjects(existing, desired) desired.reject { |d| SubjectMatcher.covered?(existing, d) } end |
.normalize_subjects(list) ⇒ Object
11 12 13 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 11 def normalize_subjects(list) Array(list).flatten.compact.map!(&:to_s).reject(&:empty?).uniq end |
.overlap_error?(error) ⇒ Boolean
24 25 26 27 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 24 def overlap_error?(error) msg = error..to_s msg =~ /subjects?\s+overlap/i || msg =~ /\berr_code=10065\b/ || msg =~ /\b400\b/ end |
.stream_not_found?(error) ⇒ Boolean
19 20 21 22 |
# File 'lib/jetstream_bridge/topology/stream.rb', line 19 def stream_not_found?(error) msg = error..to_s msg =~ /stream\s+not\s+found/i || msg =~ /\b404\b/ end |