Class: JetstreamBridge::Provisioner

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/provisioner.rb

Overview

Dedicated provisioning orchestrator to keep connection concerns separate.

Handles creating/updating stream topology and consumers. Can be used at deploy-time with admin credentials or during runtime when auto_provision is enabled.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config: JetstreamBridge.config) ⇒ Provisioner

Returns a new instance of Provisioner.



111
112
113
# File 'lib/jetstream_bridge/provisioner.rb', line 111

def initialize(config: JetstreamBridge.config)
  @config = config
end

Class Method Details

.build_consumer_mode_map(app_a, app_b, consumer_modes, fallback_mode) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/jetstream_bridge/provisioner.rb', line 67

def build_consumer_mode_map(app_a, app_b, consumer_modes, fallback_mode)
  app_a_key = app_a.to_s
  app_b_key = app_b.to_s
  normalized_fallback = ConsumerModeResolver.normalize(fallback_mode)

  if consumer_modes
    normalized = consumer_modes.transform_keys(&:to_s).transform_values do |v|
      ConsumerModeResolver.normalize(v)
    end
    normalized[app_a_key] ||= normalized_fallback
    normalized[app_b_key] ||= normalized_fallback
    return normalized
  end

  {
    app_a_key => ConsumerModeResolver.resolve(app_name: app_a_key, fallback: normalized_fallback),
    app_b_key => ConsumerModeResolver.resolve(app_name: app_b_key, fallback: normalized_fallback)
  }
end

.configure_direction(direction, stream_name:, nats_url:, logger:, consumer_mode:, shared_config:) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/jetstream_bridge/provisioner.rb', line 88

def configure_direction(direction, stream_name:, nats_url:, logger:, consumer_mode:, shared_config:)
  JetstreamBridge.configure do |cfg|
    cfg.nats_urls = nats_url
    cfg.app_name = direction[:app_name]
    cfg.destination_app = direction[:destination_app]
    cfg.stream_name = stream_name
    cfg.auto_provision = true
    cfg.use_outbox = false
    cfg.use_inbox = false
    cfg.logger = logger if logger
    cfg.consumer_mode = consumer_mode

    shared_config.each do |key, value|
      next if key.to_sym == :consumer_mode

      setter = "#{key}="
      cfg.public_send(setter, value) if cfg.respond_to?(setter)
    end
  end
end

.provision_bidirectional!(app_a:, app_b:, stream_name: 'sync-stream', nats_url: ENV.fetch('NATS_URL', 'nats://nats:4222'), logger: Logger.new($stdout), consumer_modes: nil, consumer_mode: :pull, **shared_config) ⇒ void

This method returns an undefined value.

Provision both directions (A->B and B->A) with shared defaults.

Parameters:

  • app_a (String)

    First app name

  • app_b (String)

    Second app name

  • stream_name (String) (defaults to: 'sync-stream')

    Stream used for both directions

  • nats_url (String) (defaults to: ENV.fetch('NATS_URL', 'nats://nats:4222'))

    NATS connection URL

  • logger (Logger) (defaults to: Logger.new($stdout))

    Logger used for progress output

  • shared_config (Hash)

    Additional config applied to both directions

  • consumer_modes (Hash, nil) (defaults to: nil)

    Per-app consumer modes { ‘system_a’ => :pull, ‘system_b’ => :push }

  • consumer_mode (Symbol) (defaults to: :pull)

    Legacy/shared consumer mode for both directions (overridden by consumer_modes)



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/jetstream_bridge/provisioner.rb', line 31

def provision_bidirectional!(
  app_a:,
  app_b:,
  stream_name: 'sync-stream',
  nats_url: ENV.fetch('NATS_URL', 'nats://nats:4222'),
  logger: Logger.new($stdout),
  consumer_modes: nil,
  consumer_mode: :pull,
  **shared_config
)
  modes = build_consumer_mode_map(app_a, app_b, consumer_modes, consumer_mode)

  [
    { app_name: app_a, destination_app: app_b },
    { app_name: app_b, destination_app: app_a }
  ].each do |direction|
    direction_mode = modes[direction[:app_name]] || consumer_mode
    logger&.info "Provisioning #{direction[:app_name]} -> #{direction[:destination_app]}"
    configure_direction(
      direction,
      stream_name: stream_name,
      nats_url: nats_url,
      logger: logger,
      consumer_mode: direction_mode,
      shared_config: shared_config
    )

    begin
      JetstreamBridge.startup!
      new.provision!
    ensure
      JetstreamBridge.shutdown!
    end
  end
end

Instance Method Details

#provision!(jts: nil, provision_consumer: true) ⇒ Object

Provision stream (and optionally consumer) with desired config.

Parameters:

  • jts (Object, nil) (defaults to: nil)

    Existing JetStream context (optional)

  • provision_consumer (Boolean) (defaults to: true)

    Whether to create/align the consumer too

Returns:

  • (Object)

    JetStream context used for provisioning



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/jetstream_bridge/provisioner.rb', line 120

def provision!(jts: nil, provision_consumer: true)
  js = jts || Connection.connect!(verify_js: true)

  provision_stream!(jts: js)
  provision_consumer!(jts: js) if provision_consumer

  Logging.info(
    "Provisioned stream=#{@config.stream_name} consumer=#{@config.durable_name if provision_consumer}",
    tag: 'JetstreamBridge::Provisioner'
  )

  js
end

#provision_consumer!(jts: nil) ⇒ Object

Provision durable consumer only.

Parameters:

  • jts (Object, nil) (defaults to: nil)

    Existing JetStream context (optional)

Returns:

  • (Object)

    JetStream context used



152
153
154
155
156
157
158
159
160
# File 'lib/jetstream_bridge/provisioner.rb', line 152

def provision_consumer!(jts: nil)
  js = jts || Connection.connect!(verify_js: true)
  SubscriptionManager.new(js, @config.durable_name, @config).ensure_consumer!(force: true)
  Logging.info(
    "Consumer provisioned: #{@config.durable_name}",
    tag: 'JetstreamBridge::Provisioner'
  )
  js
end

#provision_stream!(jts: nil) ⇒ Object

Provision stream only.

Parameters:

  • jts (Object, nil) (defaults to: nil)

    Existing JetStream context (optional)

Returns:

  • (Object)

    JetStream context used



138
139
140
141
142
143
144
145
146
# File 'lib/jetstream_bridge/provisioner.rb', line 138

def provision_stream!(jts: nil)
  js = jts || Connection.connect!(verify_js: true)
  Topology.provision!(js)
  Logging.info(
    "Stream provisioned: #{@config.stream_name}",
    tag: 'JetstreamBridge::Provisioner'
  )
  js
end