Class: NatsWork::JetStreamManager

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/jetstream_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, prefix: 'natswork') ⇒ JetStreamManager

Returns a new instance of JetStreamManager.



12
13
14
15
16
17
# File 'lib/natswork/jetstream_manager.rb', line 12

def initialize(connection, prefix: 'natswork')
  @connection = connection
  @prefix = prefix
  @streams = {}
  @consumers = {}
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



10
11
12
# File 'lib/natswork/jetstream_manager.rb', line 10

def connection
  @connection
end

#prefixObject (readonly)

Returns the value of attribute prefix.



10
11
12
# File 'lib/natswork/jetstream_manager.rb', line 10

def prefix
  @prefix
end

Instance Method Details

#create_consumer(stream_name, consumer_name, **options) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/natswork/jetstream_manager.rb', line 57

def create_consumer(stream_name, consumer_name, **options)
  stream_name_full = "#{@prefix}_#{stream_name}".upcase
  consumer_name_full = "#{@prefix}_#{consumer_name}"

  config = {
    durable_name: consumer_name_full,
    deliver_subject: options[:deliver_subject],
    deliver_group: options[:deliver_group] || consumer_name_full,
    ack_policy: options[:ack_policy] || :explicit,
    ack_wait: options[:ack_wait] || 30_000_000_000, # 30 seconds in nanoseconds
    max_deliver: options[:max_deliver] || 3,
    filter_subject: options[:filter_subject],
    replay_policy: options[:replay_policy] || :instant,
    deliver_policy: options[:deliver_policy] || :all,
    max_ack_pending: options[:max_ack_pending] || 1000
  }

  js = @connection.jetstream
  consumer = js.add_consumer(stream_name_full, config)

  consumer_key = "#{stream_name}:#{consumer_name}"
  @consumers[consumer_key] = consumer
  consumer
rescue StandardError => e
  raise JetStreamError, "Failed to create consumer #{consumer_name_full}: #{e.message}"
end

#create_stream(name, subjects: nil, **options) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/natswork/jetstream_manager.rb', line 19

def create_stream(name, subjects: nil, **options)
  stream_name = "#{@prefix}_#{name}".upcase
  subjects ||= ["#{@prefix}.#{name}.*"]

  # Start with minimal required config
  config = {
    name: stream_name,
    subjects: subjects
  }

  js = @connection.jetstream
  stream = js.add_stream(**config)
  @streams[name] = stream
  stream
rescue StandardError => e
  raise JetStreamError, "Failed to create stream #{stream_name}: #{e.message}"
end

#delete_consumer(stream_name, consumer_name) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/natswork/jetstream_manager.rb', line 84

def delete_consumer(stream_name, consumer_name)
  stream_name_full = "#{@prefix}_#{stream_name}".upcase
  consumer_name_full = "#{@prefix}_#{consumer_name}"

  js = @connection.jetstream
  js.delete_consumer(stream_name_full, consumer_name_full)

  consumer_key = "#{stream_name}:#{consumer_name}"
  @consumers.delete(consumer_key)
  true
rescue StandardError => e
  raise JetStreamError, "Failed to delete consumer: #{e.message}"
end

#delete_stream(name) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/natswork/jetstream_manager.rb', line 37

def delete_stream(name)
  stream_name = "#{@prefix}_#{name}".upcase
  js = @connection.jetstream
  js.delete_stream(stream_name)
  @streams.delete(name)
  true
rescue StandardError => e
  raise JetStreamError, "Failed to delete stream #{stream_name}: #{e.message}"
end

#get_stream(name) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/natswork/jetstream_manager.rb', line 47

def get_stream(name)
  stream_name = "#{@prefix}_#{name}".upcase
  @streams[name] ||= begin
    js = @connection.jetstream
    js.stream_info(stream_name)
  rescue StandardError
    nil
  end
end

#publish(subject, payload, **options) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/natswork/jetstream_manager.rb', line 115

def publish(subject, payload, **options)
  js = @connection.jetstream

  data = payload.is_a?(String) ? payload : JSON.generate(payload)

  # Build options for publish
  publish_opts = {}
  publish_opts[:header] = {} unless options.empty?
  publish_opts[:header]['Nats-Msg-Id'] = options[:msg_id] if options[:msg_id]
  publish_opts[:timeout] = options[:timeout] if options[:timeout]

  # nats-pure expects (subject, payload) or (subject, payload, opts)
  ack = if publish_opts.empty?
          js.publish(subject, data)
        else
          js.publish(subject, data, **publish_opts)
        end

  {
    stream: ack.stream,
    seq: ack.seq,
    duplicate: ack.duplicate || false
  }
rescue StandardError => e
  raise JetStreamError, "Failed to publish to JetStream: #{e.message}"
end

#pull_subscribe(stream_name, consumer_name, batch: 1, timeout: 5) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/natswork/jetstream_manager.rb', line 142

def pull_subscribe(stream_name, consumer_name, batch: 1, timeout: 5)
  stream_name_full = "#{@prefix}_#{stream_name}".upcase
  consumer_name_full = "#{@prefix}_#{consumer_name}"

  js = @connection.jetstream

  subscription = js.pull_subscribe(
    nil,
    durable: consumer_name_full,
    stream: stream_name_full
  )

  messages = []
  subscription.fetch(batch, timeout: timeout) do |msg|
    messages << JetStreamMessage.new(msg, js)
  end

  messages
rescue StandardError => e
  raise JetStreamError, "Failed to pull messages: #{e.message}"
end

#subscribe(stream_name, consumer_name, &block) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/natswork/jetstream_manager.rb', line 98

def subscribe(stream_name, consumer_name, &block)
  get_or_create_consumer(stream_name, consumer_name)

  js = @connection.jetstream
  js.subscribe(
    nil,
    durable: "#{@prefix}_#{consumer_name}",
    stream: "#{@prefix}_#{stream_name}".upcase,
    manual_ack: true
  ) do |msg|
    wrapped_msg = JetStreamMessage.new(msg, js)
    block.call(wrapped_msg)
  end
rescue StandardError => e
  raise JetStreamError, "Failed to subscribe: #{e.message}"
end