Class: JetstreamBridge::SubscriptionManager
- Inherits:
-
Object
- Object
- JetstreamBridge::SubscriptionManager
- Defined in:
- lib/jetstream_bridge/consumer/subscription_manager.rb
Overview
Manages durable consumer provisioning and subscription lifecycle.
Encapsulates the ensure-consumer + subscribe flow for both pull and push consumer modes, with automatic fallback between modes.
Instance Method Summary collapse
-
#consumer_exists? ⇒ Boolean
Check if consumer exists in the stream.
-
#create_consumer_if_missing! ⇒ Object
Create consumer only if it doesn’t already exist.
- #desired_consumer_cfg ⇒ Object
-
#ensure_consumer!(**_options) ⇒ void
Ensure the durable consumer exists on the server.
- #filter_subject ⇒ Object
-
#initialize(jts, durable, cfg = JetstreamBridge.config) ⇒ SubscriptionManager
constructor
A new instance of SubscriptionManager.
-
#stream_exists? ⇒ Boolean
Check if stream exists.
- #stream_name ⇒ Object
-
#subscribe! ⇒ Object
Bind a subscriber to the existing durable consumer.
- #subscribe_pull_with_fallback ⇒ Object
- #subscribe_push! ⇒ Object
- #subscribe_push_with_fallback ⇒ Object
- #subscribe_without_verification! ⇒ Object
Constructor Details
#initialize(jts, durable, cfg = JetstreamBridge.config) ⇒ SubscriptionManager
Returns a new instance of SubscriptionManager.
18 19 20 21 22 23 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 18 def initialize(jts, durable, cfg = JetstreamBridge.config) @jts = jts @durable = durable @cfg = cfg @desired_cfg = build_consumer_config(@durable, filter_subject) end |
Instance Method Details
#consumer_exists? ⇒ Boolean
Check if consumer exists in the stream.
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 73 def consumer_exists? @jts.consumer_info(stream_name, @durable) true rescue StandardError => e msg = e..to_s.downcase return false if msg.include?('not found') || msg.include?('does not exist') || msg.include?('no responders') # Re-raise unexpected errors raise unless msg.include?('consumer') false end |
#create_consumer_if_missing! ⇒ Object
Create consumer only if it doesn’t already exist. Fails if stream is missing.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 88 def create_consumer_if_missing! # In restricted environments (push + auto_provision=false), we still want fail-fast semantics. # Attempt creation and raise a clear error so operators know the consumer must be pre-provisioned # or the account must allow the minimal $JS.API consumer permissions. if skip_consumer_management? raise JetstreamBridge::ConsumerProvisioningError, "Consumer '#{@durable}' not ensured because auto_provision=false and push mode is enabled. " \ 'Provision the consumer with admin credentials or grant minimal consumer create/info permissions.' end # First, verify stream exists - fail fast with clear error if not unless stream_exists? raise StreamNotFoundError, "Stream '#{stream_name}' does not exist. " \ 'Streams must be provisioned separately ' \ '(use auto_provision=true or run provisioning with admin credentials).' end if consumer_exists? Logging.info( "Consumer #{@durable} already exists (stream=#{stream_name})", tag: 'JetstreamBridge::Consumer' ) return end Logging.info( "Consumer #{@durable} not found, auto-creating on stream #{stream_name}...", tag: 'JetstreamBridge::Consumer' ) create_consumer! rescue StreamNotFoundError raise rescue StandardError => e raise ConsumerProvisioningError, (e) if (e) # If creation fails due to consumer already existing (race condition), that's OK msg = e..to_s.downcase if msg.include?('already') || msg.include?('exists') Logging.info( "Consumer #{@durable} was created by another process", tag: 'JetstreamBridge::Consumer' ) return end Logging.error( "Failed to auto-create consumer #{@durable}: #{e.class} #{e.message}", tag: 'JetstreamBridge::Consumer' ) raise end |
#desired_consumer_cfg ⇒ Object
33 34 35 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 33 def desired_consumer_cfg @desired_cfg end |
#ensure_consumer!(**_options) ⇒ void
This method returns an undefined value.
Ensure the durable consumer exists on the server.
Skips all JetStream API calls when auto_provision is false, assuming the consumer was pre-provisioned externally.
45 46 47 48 49 50 51 52 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 45 def ensure_consumer!(**) unless @cfg.auto_provision Logging.info("Skipping consumer validation (auto_provision=false); assuming '#{@durable}' exists.", tag: 'JetstreamBridge::Consumer') return end create_consumer_if_missing! end |
#filter_subject ⇒ Object
29 30 31 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 29 def filter_subject @cfg.destination_subject end |
#stream_exists? ⇒ Boolean
Check if stream exists.
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 57 def stream_exists? @jts.stream_info(stream_name) true rescue StandardError => e msg = e..to_s.downcase return false if msg.include?('not found') || msg.include?('does not exist') || msg.include?('no responders') # Re-raise unexpected errors raise unless msg.include?('stream') false end |
#stream_name ⇒ Object
25 26 27 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 25 def stream_name @cfg.stream_name end |
#subscribe! ⇒ Object
Bind a subscriber to the existing durable consumer.
Automatically selects pull or push mode based on configuration, with fallback to the opposite mode if the primary subscription fails.
148 149 150 151 152 153 154 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 148 def subscribe! if @cfg.push_consumer? subscribe_push_with_fallback else subscribe_pull_with_fallback end end |
#subscribe_pull_with_fallback ⇒ Object
204 205 206 207 208 209 210 211 212 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 204 def subscribe_pull_with_fallback subscribe_without_verification! rescue JetstreamBridge::ConnectionError, StandardError => e Logging.warn( "Pull subscription failed (#{e.class}: #{e.message}); falling back to push subscription for #{@durable}", tag: 'JetstreamBridge::Consumer' ) subscribe_push! end |
#subscribe_push! ⇒ Object
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 169 def subscribe_push! # Push consumers deliver messages directly to a subscription subject # No JetStream API calls needed - just subscribe to the delivery subject delivery_subject = @cfg.push_delivery_subject queue_group = @cfg.push_consumer_group_name create_subscription_with_fallback( description: "push subscription for consumer #{@durable} " \ "(stream=#{stream_name}, delivery=#{delivery_subject}, queue=#{queue_group})", primary_check: ->(nc) { nc.respond_to?(:subscribe) }, primary_action: lambda do |nc| sub = nc.subscribe(delivery_subject, queue: queue_group) Logging.info( "Created push subscription for consumer #{@durable} " \ "(stream=#{stream_name}, delivery=#{delivery_subject}, queue=#{queue_group})", tag: 'JetstreamBridge::Consumer' ) sub end, fallback_name: :subscribe, fallback_available: -> { @jts.respond_to?(:subscribe) }, fallback_action: -> { @jts.subscribe(delivery_subject, queue: queue_group) } ) end |
#subscribe_push_with_fallback ⇒ Object
194 195 196 197 198 199 200 201 202 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 194 def subscribe_push_with_fallback subscribe_push! rescue JetstreamBridge::ConnectionError, StandardError => e Logging.warn( "Push subscription failed (#{e.class}: #{e.message}); falling back to pull subscription for #{@durable}", tag: 'JetstreamBridge::Consumer' ) subscribe_without_verification! end |
#subscribe_without_verification! ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 156 def subscribe_without_verification! # Manually create a pull subscription without calling consumer_info # This bypasses the permission check in nats-pure's pull_subscribe create_subscription_with_fallback( description: "pull subscription for consumer #{@durable} (stream=#{stream_name})", primary_check: ->(nc) { nc.respond_to?(:new_inbox) && nc.respond_to?(:subscribe) }, primary_action: ->(nc) { build_pull_subscription(nc) }, fallback_name: :pull_subscribe, fallback_available: -> { @jts.respond_to?(:pull_subscribe) }, fallback_action: -> { @jts.pull_subscribe(filter_subject, @durable, stream: stream_name) } ) end |