Module: NATS::JetStream::Manager

Defined in:
lib/nats/io/jetstream/manager.rb

Overview

A JetStream::Manager can be used to make requests to the JetStream API.

Examples:

require 'nats/client'

nc = NATS.connect("demo.nats.io")

config = JetStream::API::StreamConfig.new()
nc.jsm.add_stream(config)

Instance Method Summary collapse

Instance Method Details

#account_infoObject



255
256
257
# File 'lib/nats/io/jetstream/manager.rb', line 255

def 
  api_request("#{@prefix}.INFO")
end

#add_consumer(stream, config, params = {}) ⇒ JetStream::API::ConsumerInfo

add_consumer creates a consumer with a given config.

Parameters:

  • stream (String)

    Name of the stream.

  • config (JetStream::API::ConsumerConfig)

    Configuration of the consumer to create.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/nats/io/jetstream/manager.rb', line 102

def add_consumer(stream, config, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?
  config = if not config.is_a?(JetStream::API::ConsumerConfig)
             JetStream::API::ConsumerConfig.new(config)
           else
             config
           end
  config[:name] ||= config[:durable_name]
  req_subject = case
                when config[:name]
                  ###############################################################################
                  #                                                                             #
                  #  Using names is the supported way of creating consumers (NATS +v2.9.0.      #
                  #                                                                             #
                  ###############################################################################
                  if config[:filter_subject] && config[:filter_subject] != ">"
                    "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}"
                  else
                   ##############################################################################
                   #                                                                            #
                   # Endpoint to support creating ANY consumer with multi-filters (NATS +v2.10) #
                   #                                                                            #
                   ##############################################################################
                    "#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}"
                  end
                when config[:durable_name]
                  ###############################################################################
                  #                                                                             #
                  # Endpoint to support creating DURABLES before NATS v2.9.0.                   #
                  #                                                                             #
                  ###############################################################################
                  "#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}"
                else
                  ###############################################################################
                  #                                                                             #
                  # Endpoint to support creating EPHEMERALS before NATS v2.9.0.                 #
                  #                                                                             #
                  ###############################################################################
                  "#{@prefix}.CONSUMER.CREATE.#{stream}"
                end

  config[:ack_policy] ||= JS::Config::AckExplicit
  # Check if have to normalize ack wait so that it is in nanoseconds for Go compat.
  if config[:ack_wait]
    raise ArgumentError.new("nats: invalid ack wait") unless config[:ack_wait].is_a?(Integer)
    config[:ack_wait] = config[:ack_wait] * ::NATS::NANOSECONDS
  end
  if config[:inactive_threshold]
    raise ArgumentError.new("nats: invalid inactive threshold") unless config[:inactive_threshold].is_a?(Integer)
    config[:inactive_threshold] = config[:inactive_threshold] * ::NATS::NANOSECONDS
  end

  cfg = config.to_h.compact
  req = {
    stream_name: stream,
    config: cfg
  }

  result = api_request(req_subject, req.to_json, params)
  JetStream::API::ConsumerInfo.new(result).freeze
end

#add_stream(config, params = {}) ⇒ JetStream::API::StreamCreateResponse

add_stream creates a stream with a given config.

Parameters:

  • config (JetStream::API::StreamConfig)

    Configuration of the stream to create.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:

  • (ArgumentError)


34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nats/io/jetstream/manager.rb', line 34

def add_stream(config, params={})
  config = if not config.is_a?(JetStream::API::StreamConfig)
             JetStream::API::StreamConfig.new(config)
           else
             config
           end
  stream = config[:name]
  raise ArgumentError.new(":name is required to create streams") unless stream
  raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/
  req_subject = "#{@prefix}.STREAM.CREATE.#{stream}"

  cfg = config.to_h.compact
  result = api_request(req_subject, cfg.to_json, params)
  JetStream::API::StreamCreateResponse.new(result)
end

#consumer_info(stream, consumer, params = {}) ⇒ JetStream::API::ConsumerInfo

consumer_info retrieves the current status of a consumer.

Parameters:

  • stream (String)

    Name of the stream.

  • consumer (String)

    Name of the consumer.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:



170
171
172
173
174
175
176
177
# File 'lib/nats/io/jetstream/manager.rb', line 170

def consumer_info(stream, consumer, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?
  raise JetStream::Error::InvalidConsumerName.new("nats: invalid consumer name") if consumer.nil? or consumer.empty?

  req_subject = "#{@prefix}.CONSUMER.INFO.#{stream}.#{consumer}"
  result = api_request(req_subject, '', params)
  JetStream::API::ConsumerInfo.new(result)
end

#delete_consumer(stream, consumer, params = {}) ⇒ Boolean

delete_consumer deletes a consumer.

Parameters:

  • stream (String)

    Name of the stream.

  • consumer (String)

    Name of the consumer.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

  • (Boolean)

Raises:



185
186
187
188
189
190
191
192
# File 'lib/nats/io/jetstream/manager.rb', line 185

def delete_consumer(stream, consumer, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?
  raise JetStream::Error::InvalidConsumerName.new("nats: invalid consumer name") if consumer.nil? or consumer.empty?

  req_subject = "#{@prefix}.CONSUMER.DELETE.#{stream}.#{consumer}"
  result = api_request(req_subject, '', params)
  result[:success]
end

#delete_stream(stream, params = {}) ⇒ Boolean

delete_stream deletes a stream.

Parameters:

  • stream (String)

    Name of the stream.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

  • (Boolean)

Raises:



88
89
90
91
92
93
94
# File 'lib/nats/io/jetstream/manager.rb', line 88

def delete_stream(stream, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?

  req_subject = "#{@prefix}.STREAM.DELETE.#{stream}"
  result = api_request(req_subject, '', params)
  result[:success]
end

#find_stream_name_by_subject(subject, params = {}) ⇒ String

find_stream_name_by_subject does a lookup for the stream to which the subject belongs.

Parameters:

  • subject (String)

    The subject that belongs to a stream.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

  • (String)

    The name of the JetStream stream for the subject.

Raises:



200
201
202
203
204
205
206
207
# File 'lib/nats/io/jetstream/manager.rb', line 200

def find_stream_name_by_subject(subject, params={})
  req_subject = "#{@prefix}.STREAM.NAMES"
  req = { subject: subject }
  result = api_request(req_subject, req.to_json, params)
  raise JetStream::Error::NotFound unless result[:streams]

  result[:streams].first
end

#get_last_msg(stream_name, subject, params = {}) ⇒ Object



250
251
252
253
# File 'lib/nats/io/jetstream/manager.rb', line 250

def get_last_msg(stream_name, subject, params={})
  params[:subject] = subject
  get_msg(stream_name, params)
end

#get_msg(stream_name, params = {}) ⇒ Object

get_msg retrieves a message from the stream.

Parameters:

  • stream_name (String)

    The stream_name.

  • params (Hash) (defaults to: {})

    Options to customize API request.

  • next (Hash)

    a customizable set of options

  • seq (Hash)

    a customizable set of options

  • subject (Hash)

    a customizable set of options

  • direct (Hash)

    a customizable set of options



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/nats/io/jetstream/manager.rb', line 216

def get_msg(stream_name, params={})
  req = {}
  case
  when params[:next]
    req[:seq] = params[:seq]
    req[:next_by_subj] = params[:subject]
  when params[:seq]
    req[:seq] = params[:seq]
  when params[:subject]
    req[:last_by_subj] = params[:subject]
  end

  data = req.to_json
  if params[:direct]
    if params[:subject] and not params[:seq]
      # last_by_subject type request requires no payload.
      data = ''
      req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}.#{params[:subject]}"
    else
      req_subject = "#{@prefix}.DIRECT.GET.#{stream_name}"
    end
  else
    req_subject = "#{@prefix}.STREAM.MSG.GET.#{stream_name}"
  end
  resp = api_request(req_subject, data, direct: params[:direct])
  msg = if params[:direct]
          _lift_msg_to_raw_msg(resp)
        else
          JetStream::API::RawStreamMsg.new(resp[:message])
        end

  msg
end

#stream_info(stream, params = {}) ⇒ JetStream::API::StreamInfo

stream_info retrieves the current status of a stream.

Parameters:

  • stream (String)

    Name of the stream.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:



55
56
57
58
59
60
61
# File 'lib/nats/io/jetstream/manager.rb', line 55

def stream_info(stream, params={})
  raise JetStream::Error::InvalidStreamName.new("nats: invalid stream name") if stream.nil? or stream.empty?

  req_subject = "#{@prefix}.STREAM.INFO.#{stream}"
  result = api_request(req_subject, '', params)
  JetStream::API::StreamInfo.new(result)
end

#update_stream(config, params = {}) ⇒ JetStream::API::StreamCreateResponse

update_stream edits an existed stream with a given config.

Parameters:

  • config (JetStream::API::StreamConfig)

    Configuration of the stream to create.

  • params (Hash) (defaults to: {})

    Options to customize API request.

Options Hash (params):

  • :timeout (Float)

    Time to wait for response.

Returns:

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/nats/io/jetstream/manager.rb', line 68

def update_stream(config, params={})
  config = if not config.is_a?(JetStream::API::StreamConfig)
             JetStream::API::StreamConfig.new(config)
           else
             config
           end
  stream = config[:name]
  raise ArgumentError.new(":name is required to create streams") unless stream
  raise ArgumentError.new("Spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited in stream names") if stream =~ /(\s|\.|\>|\*)/
  req_subject = "#{@prefix}.STREAM.UPDATE.#{stream}"
  cfg = config.to_h.compact
  result = api_request(req_subject, cfg.to_json, params)
  JetStream::API::StreamCreateResponse.new(result)
end