Class: JetstreamBridge::Provisioner
- Inherits:
-
Object
- Object
- JetstreamBridge::Provisioner
- Defined in:
- lib/jetstream_bridge/provisioner.rb
Overview
Dedicated provisioning orchestrator to keep connection concerns separate.
Handles creating/updating stream topology and consumers. Can be used at deploy-time with admin credentials or during runtime when auto_provision is enabled.
Class Method Summary collapse
- .build_consumer_mode_map(app_a, app_b, consumer_modes, fallback_mode) ⇒ Object
- .configure_direction(direction, stream_name:, nats_url:, logger:, consumer_mode:, shared_config:) ⇒ Object
-
.provision_bidirectional!(app_a:, app_b:, stream_name: 'sync-stream', nats_url: ENV.fetch('NATS_URL', 'nats://nats:4222'), logger: Logger.new($stdout), consumer_modes: nil, consumer_mode: :pull, **shared_config) ⇒ void
Provision both directions (A->B and B->A) with shared defaults.
Instance Method Summary collapse
-
#initialize(config: JetstreamBridge.config) ⇒ Provisioner
constructor
A new instance of Provisioner.
-
#provision!(jts: nil, provision_consumer: true) ⇒ Object
Provision stream (and optionally consumer) with desired config.
-
#provision_consumer!(jts: nil) ⇒ Object
Provision durable consumer only.
-
#provision_stream!(jts: nil) ⇒ Object
Provision stream only.
Constructor Details
#initialize(config: JetstreamBridge.config) ⇒ Provisioner
Returns a new instance of Provisioner.
111 112 113 |
# File 'lib/jetstream_bridge/provisioner.rb', line 111 def initialize(config: JetstreamBridge.config) @config = config end |
Class Method Details
.build_consumer_mode_map(app_a, app_b, consumer_modes, fallback_mode) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/jetstream_bridge/provisioner.rb', line 67 def build_consumer_mode_map(app_a, app_b, consumer_modes, fallback_mode) app_a_key = app_a.to_s app_b_key = app_b.to_s normalized_fallback = ConsumerModeResolver.normalize(fallback_mode) if consumer_modes normalized = consumer_modes.transform_keys(&:to_s).transform_values do |v| ConsumerModeResolver.normalize(v) end normalized[app_a_key] ||= normalized_fallback normalized[app_b_key] ||= normalized_fallback return normalized end { app_a_key => ConsumerModeResolver.resolve(app_name: app_a_key, fallback: normalized_fallback), app_b_key => ConsumerModeResolver.resolve(app_name: app_b_key, fallback: normalized_fallback) } end |
.configure_direction(direction, stream_name:, nats_url:, logger:, consumer_mode:, shared_config:) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/jetstream_bridge/provisioner.rb', line 88 def configure_direction(direction, stream_name:, nats_url:, logger:, consumer_mode:, shared_config:) JetstreamBridge.configure do |cfg| cfg.nats_urls = nats_url cfg.app_name = direction[:app_name] cfg.destination_app = direction[:destination_app] cfg.stream_name = stream_name cfg.auto_provision = true cfg.use_outbox = false cfg.use_inbox = false cfg.logger = logger if logger cfg.consumer_mode = consumer_mode shared_config.each do |key, value| next if key.to_sym == :consumer_mode setter = "#{key}=" cfg.public_send(setter, value) if cfg.respond_to?(setter) end end end |
.provision_bidirectional!(app_a:, app_b:, stream_name: 'sync-stream', nats_url: ENV.fetch('NATS_URL', 'nats://nats:4222'), logger: Logger.new($stdout), consumer_modes: nil, consumer_mode: :pull, **shared_config) ⇒ void
This method returns an undefined value.
Provision both directions (A->B and B->A) with shared defaults.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/jetstream_bridge/provisioner.rb', line 31 def provision_bidirectional!( app_a:, app_b:, stream_name: 'sync-stream', nats_url: ENV.fetch('NATS_URL', 'nats://nats:4222'), logger: Logger.new($stdout), consumer_modes: nil, consumer_mode: :pull, **shared_config ) modes = build_consumer_mode_map(app_a, app_b, consumer_modes, consumer_mode) [ { app_name: app_a, destination_app: app_b }, { app_name: app_b, destination_app: app_a } ].each do |direction| direction_mode = modes[direction[:app_name]] || consumer_mode logger&.info "Provisioning #{direction[:app_name]} -> #{direction[:destination_app]}" configure_direction( direction, stream_name: stream_name, nats_url: nats_url, logger: logger, consumer_mode: direction_mode, shared_config: shared_config ) begin JetstreamBridge.startup! new.provision! ensure JetstreamBridge.shutdown! end end end |
Instance Method Details
#provision!(jts: nil, provision_consumer: true) ⇒ Object
Provision stream (and optionally consumer) with desired config.
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/jetstream_bridge/provisioner.rb', line 120 def provision!(jts: nil, provision_consumer: true) js = jts || Connection.connect!(verify_js: true) provision_stream!(jts: js) provision_consumer!(jts: js) if provision_consumer Logging.info( "Provisioned stream=#{@config.stream_name} consumer=#{@config.durable_name if provision_consumer}", tag: 'JetstreamBridge::Provisioner' ) js end |
#provision_consumer!(jts: nil) ⇒ Object
Provision durable consumer only.
152 153 154 155 156 157 158 159 160 |
# File 'lib/jetstream_bridge/provisioner.rb', line 152 def provision_consumer!(jts: nil) js = jts || Connection.connect!(verify_js: true) SubscriptionManager.new(js, @config.durable_name, @config).ensure_consumer!(force: true) Logging.info( "Consumer provisioned: #{@config.durable_name}", tag: 'JetstreamBridge::Provisioner' ) js end |
#provision_stream!(jts: nil) ⇒ Object
Provision stream only.
138 139 140 141 142 143 144 145 146 |
# File 'lib/jetstream_bridge/provisioner.rb', line 138 def provision_stream!(jts: nil) js = jts || Connection.connect!(verify_js: true) Topology.provision!(js) Logging.info( "Stream provisioned: #{@config.stream_name}", tag: 'JetstreamBridge::Provisioner' ) js end |