Class: NatsWork::JetStreamManager
- Inherits:
-
Object
- Object
- NatsWork::JetStreamManager
- Defined in:
- lib/natswork/jetstream_manager.rb
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#prefix ⇒ Object
readonly
Returns the value of attribute prefix.
Instance Method Summary collapse
- #create_consumer(stream_name, consumer_name, **options) ⇒ Object
- #create_stream(name, subjects: nil, **options) ⇒ Object
- #delete_consumer(stream_name, consumer_name) ⇒ Object
- #delete_stream(name) ⇒ Object
- #get_stream(name) ⇒ Object
-
#initialize(connection, prefix: 'natswork') ⇒ JetStreamManager
constructor
A new instance of JetStreamManager.
- #publish(subject, payload, **options) ⇒ Object
- #pull_subscribe(stream_name, consumer_name, batch: 1, timeout: 5) ⇒ Object
- #subscribe(stream_name, consumer_name, &block) ⇒ Object
Constructor Details
#initialize(connection, prefix: 'natswork') ⇒ JetStreamManager
Returns a new instance of JetStreamManager.
12 13 14 15 16 17 |
# File 'lib/natswork/jetstream_manager.rb', line 12 def initialize(connection, prefix: 'natswork') @connection = connection @prefix = prefix @streams = {} @consumers = {} end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
10 11 12 |
# File 'lib/natswork/jetstream_manager.rb', line 10 def connection @connection end |
#prefix ⇒ Object (readonly)
Returns the value of attribute prefix.
10 11 12 |
# File 'lib/natswork/jetstream_manager.rb', line 10 def prefix @prefix end |
Instance Method Details
#create_consumer(stream_name, consumer_name, **options) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/natswork/jetstream_manager.rb', line 57 def create_consumer(stream_name, consumer_name, **) stream_name_full = "#{@prefix}_#{stream_name}".upcase consumer_name_full = "#{@prefix}_#{consumer_name}" config = { durable_name: consumer_name_full, deliver_subject: [:deliver_subject], deliver_group: [:deliver_group] || consumer_name_full, ack_policy: [:ack_policy] || :explicit, ack_wait: [:ack_wait] || 30_000_000_000, # 30 seconds in nanoseconds max_deliver: [:max_deliver] || 3, filter_subject: [:filter_subject], replay_policy: [:replay_policy] || :instant, deliver_policy: [:deliver_policy] || :all, max_ack_pending: [:max_ack_pending] || 1000 } js = @connection.jetstream consumer = js.add_consumer(stream_name_full, config) consumer_key = "#{stream_name}:#{consumer_name}" @consumers[consumer_key] = consumer consumer rescue StandardError => e raise JetStreamError, "Failed to create consumer #{consumer_name_full}: #{e.}" end |
#create_stream(name, subjects: nil, **options) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/natswork/jetstream_manager.rb', line 19 def create_stream(name, subjects: nil, **) stream_name = "#{@prefix}_#{name}".upcase subjects ||= ["#{@prefix}.#{name}.*"] # Start with minimal required config config = { name: stream_name, subjects: subjects } js = @connection.jetstream stream = js.add_stream(**config) @streams[name] = stream stream rescue StandardError => e raise JetStreamError, "Failed to create stream #{stream_name}: #{e.}" end |
#delete_consumer(stream_name, consumer_name) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/natswork/jetstream_manager.rb', line 84 def delete_consumer(stream_name, consumer_name) stream_name_full = "#{@prefix}_#{stream_name}".upcase consumer_name_full = "#{@prefix}_#{consumer_name}" js = @connection.jetstream js.delete_consumer(stream_name_full, consumer_name_full) consumer_key = "#{stream_name}:#{consumer_name}" @consumers.delete(consumer_key) true rescue StandardError => e raise JetStreamError, "Failed to delete consumer: #{e.}" end |
#delete_stream(name) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/natswork/jetstream_manager.rb', line 37 def delete_stream(name) stream_name = "#{@prefix}_#{name}".upcase js = @connection.jetstream js.delete_stream(stream_name) @streams.delete(name) true rescue StandardError => e raise JetStreamError, "Failed to delete stream #{stream_name}: #{e.}" end |
#get_stream(name) ⇒ Object
47 48 49 50 51 52 53 54 55 |
# File 'lib/natswork/jetstream_manager.rb', line 47 def get_stream(name) stream_name = "#{@prefix}_#{name}".upcase @streams[name] ||= begin js = @connection.jetstream js.stream_info(stream_name) rescue StandardError nil end end |
#publish(subject, payload, **options) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/natswork/jetstream_manager.rb', line 115 def publish(subject, payload, **) js = @connection.jetstream data = payload.is_a?(String) ? payload : JSON.generate(payload) # Build options for publish publish_opts = {} publish_opts[:header] = {} unless .empty? publish_opts[:header]['Nats-Msg-Id'] = [:msg_id] if [:msg_id] publish_opts[:timeout] = [:timeout] if [:timeout] # nats-pure expects (subject, payload) or (subject, payload, opts) ack = if publish_opts.empty? js.publish(subject, data) else js.publish(subject, data, **publish_opts) end { stream: ack.stream, seq: ack.seq, duplicate: ack.duplicate || false } rescue StandardError => e raise JetStreamError, "Failed to publish to JetStream: #{e.}" end |
#pull_subscribe(stream_name, consumer_name, batch: 1, timeout: 5) ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/natswork/jetstream_manager.rb', line 142 def pull_subscribe(stream_name, consumer_name, batch: 1, timeout: 5) stream_name_full = "#{@prefix}_#{stream_name}".upcase consumer_name_full = "#{@prefix}_#{consumer_name}" js = @connection.jetstream subscription = js.pull_subscribe( nil, durable: consumer_name_full, stream: stream_name_full ) = [] subscription.fetch(batch, timeout: timeout) do |msg| << JetStreamMessage.new(msg, js) end rescue StandardError => e raise JetStreamError, "Failed to pull messages: #{e.}" end |
#subscribe(stream_name, consumer_name, &block) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/natswork/jetstream_manager.rb', line 98 def subscribe(stream_name, consumer_name, &block) get_or_create_consumer(stream_name, consumer_name) js = @connection.jetstream js.subscribe( nil, durable: "#{@prefix}_#{consumer_name}", stream: "#{@prefix}_#{stream_name}".upcase, manual_ack: true ) do |msg| wrapped_msg = JetStreamMessage.new(msg, js) block.call(wrapped_msg) end rescue StandardError => e raise JetStreamError, "Failed to subscribe: #{e.}" end |