Class: JetstreamBridge::SubscriptionManager
- Inherits:
-
Object
- Object
- JetstreamBridge::SubscriptionManager
- Defined in:
- lib/jetstream_bridge/consumer/subscription_manager.rb
Overview
Encapsulates durable ensure + subscribe for a pull consumer.
Instance Method Summary collapse
- #desired_consumer_cfg ⇒ Object
- #ensure_consumer! ⇒ Object
- #filter_subject ⇒ Object
-
#initialize(jts, durable, cfg = JetstreamBridge.config) ⇒ SubscriptionManager
constructor
A new instance of SubscriptionManager.
- #stream_name ⇒ Object
-
#subscribe! ⇒ Object
Bind a pull subscriber to the existing durable.
Constructor Details
#initialize(jts, durable, cfg = JetstreamBridge.config) ⇒ SubscriptionManager
9 10 11 12 13 14 15 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 9 def initialize(jts, durable, cfg = JetstreamBridge.config) @jts = jts @durable = durable @cfg = cfg @desired_cfg = build_consumer_config(@durable, filter_subject) @desired_cfg_norm = normalize_consumer_config(@desired_cfg) end |
Instance Method Details
#desired_consumer_cfg ⇒ Object
25 26 27 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 25 def desired_consumer_cfg @desired_cfg end |
#ensure_consumer! ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 29 def ensure_consumer! info = consumer_info_or_nil return create_consumer! unless info have_norm = normalize_consumer_config(info.config) if have_norm == @desired_cfg_norm log_consumer_ok else log_consumer_diff(have_norm) recreate_consumer! end end |
#filter_subject ⇒ Object
21 22 23 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 21 def filter_subject @cfg.destination_subject end |
#stream_name ⇒ Object
17 18 19 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 17 def stream_name @cfg.stream_name end |
#subscribe! ⇒ Object
Bind a pull subscriber to the existing durable.
43 44 45 46 47 48 49 50 |
# File 'lib/jetstream_bridge/consumer/subscription_manager.rb', line 43 def subscribe! @jts.pull_subscribe( filter_subject, @durable, stream: stream_name, config: desired_consumer_cfg ) end |