Class: JetstreamBridge::SubscriptionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/subscription_manager.rb

Overview

Encapsulates durable ensure + subscribe for a pull consumer.

Instance Method Summary collapse

Constructor Details

#initialize(jts, durable, cfg = JetstreamBridge.config) ⇒ SubscriptionManager



9
10
11
12
13
14
15
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 9

def initialize(jts, durable, cfg = JetstreamBridge.config)
  @jts     = jts
  @durable = durable
  @cfg     = cfg
  @desired_cfg      = build_consumer_config(@durable, filter_subject)
  @desired_cfg_norm = normalize_consumer_config(@desired_cfg)
end

Instance Method Details

#desired_consumer_cfgObject



25
26
27
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 25

def desired_consumer_cfg
  @desired_cfg
end

#ensure_consumer!Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 29

def ensure_consumer!
  info = consumer_info_or_nil
  return create_consumer! unless info

  have_norm = normalize_consumer_config(info.config)
  if have_norm == @desired_cfg_norm
    log_consumer_ok
  else
    log_consumer_diff(have_norm)
    recreate_consumer!
  end
end

#filter_subjectObject



21
22
23
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 21

def filter_subject
  @cfg.destination_subject
end

#stream_nameObject



17
18
19
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 17

def stream_name
  @cfg.stream_name
end

#subscribe!Object

Bind a pull subscriber to the existing durable.



43
44
45
46
47
48
49
50
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 43

def subscribe!
  @jts.pull_subscribe(
    filter_subject,
    @durable,
    stream: stream_name,
    config: desired_consumer_cfg
  )
end