Class: JetstreamBridge::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/topology/stream.rb

Overview

Ensures a stream exists and updates only uncovered subjects, using work-queue semantics.

Constant Summary collapse

RETENTION =
'workqueue'
STORAGE =
'file'

Class Method Summary collapse

Class Method Details

.ensure!(jts, name, subjects) ⇒ Object

Raises:

  • (ArgumentError)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/jetstream_bridge/topology/stream.rb', line 94

def ensure!(jts, name, subjects)
  desired = StreamSupport.normalize_subjects(subjects)
  raise ArgumentError, 'subjects must not be empty' if desired.empty?

  attempts = 0
  max_attempts = 3
  backoffs = [0.05, 0.2, 0.5]

  begin
    info = safe_stream_info(jts, name)
    info ? ensure_update(jts, name, info, desired) : ensure_create(jts, name, desired)
  rescue NATS::JetStream::Error => e
    if StreamSupport.overlap_error?(e) && (attempts += 1) <= max_attempts
      backoff = backoffs[attempts - 1] || backoffs.last
      Logging.warn(
        "Overlap race while ensuring #{name}; retry #{attempts}/#{max_attempts} after #{backoff}s...",
        tag: 'JetstreamBridge::Stream'
      )
      sleep(backoff)
      retry
    elsif StreamSupport.overlap_error?(e)
      Logging.warn(
        "Overlap persists ensuring #{name} after #{attempts} attempts; leaving unchanged. " \
        "err=#{e.message.inspect}",
        tag: 'JetstreamBridge::Stream'
      )
      nil
    else
      raise
    end
  end
end