Class: JetstreamBridge::SubscriptionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/subscription_manager.rb

Overview

Manages durable consumer provisioning and subscription lifecycle.

Encapsulates the ensure-consumer + subscribe flow for both pull and push consumer modes, with automatic fallback between modes.

Instance Method Summary collapse

Constructor Details

#initialize(jts, durable, cfg = JetstreamBridge.config) ⇒ SubscriptionManager

Returns a new instance of SubscriptionManager.

Parameters:

  • jts (NATS::JetStream)

    JetStream context

  • durable (String)

    Durable consumer name

  • cfg (Config) (defaults to: JetstreamBridge.config)

    Configuration instance



18
19
20
21
22
23
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 18

def initialize(jts, durable, cfg = JetstreamBridge.config)
  @jts     = jts
  @durable = durable
  @cfg     = cfg
  @desired_cfg = build_consumer_config(@durable, filter_subject)
end

Instance Method Details

#consumer_exists?Boolean

Check if consumer exists in the stream.

Returns:

  • (Boolean)

    true if consumer exists, false otherwise



73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 73

def consumer_exists?
  @jts.consumer_info(stream_name, @durable)
  true
rescue StandardError => e
  msg = e.message.to_s.downcase
  return false if msg.include?('not found') || msg.include?('does not exist') || msg.include?('no responders')

  # Re-raise unexpected errors
  raise unless msg.include?('consumer')

  false
end

#create_consumer_if_missing!Object

Create consumer only if it doesn’t already exist. Fails if stream is missing.

Raises:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 88

def create_consumer_if_missing!
  # In restricted environments (push + auto_provision=false), we still want fail-fast semantics.
  # Attempt creation and raise a clear error so operators know the consumer must be pre-provisioned
  # or the account must allow the minimal $JS.API consumer permissions.
  if skip_consumer_management?
    raise JetstreamBridge::ConsumerProvisioningError,
          "Consumer '#{@durable}' not ensured because auto_provision=false and push mode is enabled. " \
          'Provision the consumer with admin credentials or grant minimal consumer create/info permissions.'
  end

  # First, verify stream exists - fail fast with clear error if not
  unless stream_exists?
    raise StreamNotFoundError,
          "Stream '#{stream_name}' does not exist. " \
          'Streams must be provisioned separately ' \
          '(use auto_provision=true or run provisioning with admin credentials).'
  end

  if consumer_exists?
    Logging.info(
      "Consumer #{@durable} already exists (stream=#{stream_name})",
      tag: 'JetstreamBridge::Consumer'
    )
    return
  end

  Logging.info(
    "Consumer #{@durable} not found, auto-creating on stream #{stream_name}...",
    tag: 'JetstreamBridge::Consumer'
  )
  create_consumer!
rescue StreamNotFoundError
  raise
rescue StandardError => e
  raise ConsumerProvisioningError, permission_error_message(e) if permission_denied?(e)

  # If creation fails due to consumer already existing (race condition), that's OK
  msg = e.message.to_s.downcase
  if msg.include?('already') || msg.include?('exists')
    Logging.info(
      "Consumer #{@durable} was created by another process",
      tag: 'JetstreamBridge::Consumer'
    )
    return
  end

  Logging.error(
    "Failed to auto-create consumer #{@durable}: #{e.class} #{e.message}",
    tag: 'JetstreamBridge::Consumer'
  )
  raise
end

#desired_consumer_cfgObject



33
34
35
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 33

def desired_consumer_cfg
  @desired_cfg
end

#ensure_consumer!(**_options) ⇒ void

This method returns an undefined value.

Ensure the durable consumer exists on the server.

Skips all JetStream API calls when auto_provision is false, assuming the consumer was pre-provisioned externally.

Raises:



45
46
47
48
49
50
51
52
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 45

def ensure_consumer!(**_options)
  unless @cfg.auto_provision
    Logging.info("Skipping consumer validation (auto_provision=false); assuming '#{@durable}' exists.",
                 tag: 'JetstreamBridge::Consumer')
    return
  end
  create_consumer_if_missing!
end

#filter_subjectObject



29
30
31
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 29

def filter_subject
  @cfg.destination_subject
end

#stream_exists?Boolean

Check if stream exists.

Returns:

  • (Boolean)

    true if stream exists, false otherwise



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 57

def stream_exists?
  @jts.stream_info(stream_name)
  true
rescue StandardError => e
  msg = e.message.to_s.downcase
  return false if msg.include?('not found') || msg.include?('does not exist') || msg.include?('no responders')

  # Re-raise unexpected errors
  raise unless msg.include?('stream')

  false
end

#stream_nameObject



25
26
27
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 25

def stream_name
  @cfg.stream_name
end

#subscribe!Object

Bind a subscriber to the existing durable consumer.

Automatically selects pull or push mode based on configuration, with fallback to the opposite mode if the primary subscription fails.

Returns:

  • (Object)

    NATS subscription handle (pull or push)

Raises:



148
149
150
151
152
153
154
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 148

def subscribe!
  if @cfg.push_consumer?
    subscribe_push_with_fallback
  else
    subscribe_pull_with_fallback
  end
end

#subscribe_pull_with_fallbackObject



204
205
206
207
208
209
210
211
212
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 204

def subscribe_pull_with_fallback
  subscribe_without_verification!
rescue JetstreamBridge::ConnectionError, StandardError => e
  Logging.warn(
    "Pull subscription failed (#{e.class}: #{e.message}); falling back to push subscription for #{@durable}",
    tag: 'JetstreamBridge::Consumer'
  )
  subscribe_push!
end

#subscribe_push!Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 169

def subscribe_push!
  # Push consumers deliver messages directly to a subscription subject
  # No JetStream API calls needed - just subscribe to the delivery subject
  delivery_subject = @cfg.push_delivery_subject
  queue_group = @cfg.push_consumer_group_name

  create_subscription_with_fallback(
    description: "push subscription for consumer #{@durable} " \
                 "(stream=#{stream_name}, delivery=#{delivery_subject}, queue=#{queue_group})",
    primary_check: ->(nc) { nc.respond_to?(:subscribe) },
    primary_action: lambda do |nc|
      sub = nc.subscribe(delivery_subject, queue: queue_group)
      Logging.info(
        "Created push subscription for consumer #{@durable} " \
        "(stream=#{stream_name}, delivery=#{delivery_subject}, queue=#{queue_group})",
        tag: 'JetstreamBridge::Consumer'
      )
      sub
    end,
    fallback_name: :subscribe,
    fallback_available: -> { @jts.respond_to?(:subscribe) },
    fallback_action: -> { @jts.subscribe(delivery_subject, queue: queue_group) }
  )
end

#subscribe_push_with_fallbackObject



194
195
196
197
198
199
200
201
202
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 194

def subscribe_push_with_fallback
  subscribe_push!
rescue JetstreamBridge::ConnectionError, StandardError => e
  Logging.warn(
    "Push subscription failed (#{e.class}: #{e.message}); falling back to pull subscription for #{@durable}",
    tag: 'JetstreamBridge::Consumer'
  )
  subscribe_without_verification!
end

#subscribe_without_verification!Object



156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 156

def subscribe_without_verification!
  # Manually create a pull subscription without calling consumer_info
  # This bypasses the permission check in nats-pure's pull_subscribe
  create_subscription_with_fallback(
    description: "pull subscription for consumer #{@durable} (stream=#{stream_name})",
    primary_check: ->(nc) { nc.respond_to?(:new_inbox) && nc.respond_to?(:subscribe) },
    primary_action: ->(nc) { build_pull_subscription(nc) },
    fallback_name: :pull_subscribe,
    fallback_available: -> { @jts.respond_to?(:pull_subscribe) },
    fallback_action: -> { @jts.pull_subscribe(filter_subject, @durable, stream: stream_name) }
  )
end