Module: NATS::JetStream::API
- Defined in:
- lib/nats/io/jetstream/api.rb
Overview
JetStream::API are the types used to interact with the JetStream API.
Defined Under Namespace
Classes: ConsumerConfig, ConsumerInfo, RawStreamMsg, SequenceInfo, StreamConfig, StreamCreateResponse, StreamInfo, StreamState
Constant Summary collapse
- Error =
When the server responds with an error from the JetStream API.
::NATS::JetStream::Error::APIError
Instance Attribute Summary collapse
- #ack_floor ⇒ SequenceInfo
- #ack_policy ⇒ String
- #ack_wait ⇒ Integer
- #bytes ⇒ Integer
- #cluster ⇒ Hash
- #config ⇒ Hash
- #consumer_count ⇒ Integer
-
#consumer_seq ⇒ Integer
The consumer sequence.
- #created ⇒ String
- #deliver_policy ⇒ String
- #delivered ⇒ SequenceInfo
- #did_create ⇒ Boolean
- #discard ⇒ String
- #duplicate_window ⇒ Integer
- #durable_name ⇒ String
- #first_seq ⇒ Integer
- #last_seq ⇒ Integer
- #max_ack_pending ⇒ Integer
- #max_age ⇒ Integer
- #max_bytes ⇒ Integer
- #max_consumers ⇒ Integer
- #max_deliver ⇒ Integer
- #max_msg_size ⇒ Integer
- #max_msgs ⇒ Integer
- #max_msgs_per_subject ⇒ Integer
- #max_waiting ⇒ Integer
- #messages ⇒ Integer
- #name ⇒ String
- #num_ack_pending ⇒ Integer
- #num_pending ⇒ Integer
- #num_redelivered ⇒ Integer
- #num_replicas ⇒ Integer
- #num_waiting ⇒ Integer
- #replay_policy ⇒ String
- #retention ⇒ String
- #state ⇒ StreamState
- #storage ⇒ String
-
#stream_name ⇒ String
Name of the stream to which the consumer belongs.
-
#stream_seq ⇒ Integer
The stream sequence.
- #subjects ⇒ Array
- #type ⇒ String
Instance Attribute Details
#ack_floor ⇒ SequenceInfo
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#ack_policy ⇒ String
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#ack_wait ⇒ Integer
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#bytes ⇒ Integer
247 248 249 250 251 252 253 254 255 |
# File 'lib/nats/io/jetstream/api.rb', line 247 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#cluster ⇒ Hash
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#config ⇒ Hash
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#consumer_count ⇒ Integer
247 248 249 250 251 252 253 254 255 |
# File 'lib/nats/io/jetstream/api.rb', line 247 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#consumer_seq ⇒ Integer
Returns The consumer sequence.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/nats/io/jetstream/api.rb', line 31 SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#created ⇒ String
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#deliver_policy ⇒ String
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#delivered ⇒ SequenceInfo
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#did_create ⇒ Boolean
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#discard ⇒ String
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#duplicate_window ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#durable_name ⇒ String
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#first_seq ⇒ Integer
247 248 249 250 251 252 253 254 255 |
# File 'lib/nats/io/jetstream/api.rb', line 247 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#last_seq ⇒ Integer
247 248 249 250 251 252 253 254 255 |
# File 'lib/nats/io/jetstream/api.rb', line 247 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_ack_pending ⇒ Integer
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_age ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_bytes ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_consumers ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_deliver ⇒ Integer
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_msg_size ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_msgs ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_msgs_per_subject ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#max_waiting ⇒ Integer
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#messages ⇒ Integer
247 248 249 250 251 252 253 254 255 |
# File 'lib/nats/io/jetstream/api.rb', line 247 StreamState = Struct.new(:messages, :bytes, :first_seq, :first_ts, :last_seq, :last_ts, :consumer_count, keyword_init: true) do def initialize(opts={}) rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#name ⇒ String
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_ack_pending ⇒ Integer
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_pending ⇒ Integer
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_redelivered ⇒ Integer
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#num_replicas ⇒ Integer
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#num_waiting ⇒ Integer
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#replay_policy ⇒ String
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/nats/io/jetstream/api.rb', line 104 ConsumerConfig = Struct.new(:name, :durable_name, :description, :deliver_policy, :opt_start_seq, :opt_start_time, :ack_policy, :ack_wait, :max_deliver, :backoff, :filter_subject, :replay_policy, :rate_limit_bps, :sample_freq, :max_waiting, :max_ack_pending, :flow_control, :idle_heartbeat, :headers_only, # Pull based options :max_batch, :max_expires, # Push based consumers :deliver_subject, :deliver_group, # Ephemeral inactivity threshold :inactive_threshold, # Generally inherited by parent stream and other markers, # now can be configured directly. :num_replicas, # Force memory storage :mem_storage, # NATS v2.10 features :metadata, :filter_subjects, :max_bytes, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#retention ⇒ String
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#state ⇒ StreamState
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#storage ⇒ String
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#stream_name ⇒ String
Returns name of the stream to which the consumer belongs.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/nats/io/jetstream/api.rb', line 66 ConsumerInfo = Struct.new(:type, :stream_name, :name, :created, :config, :delivered, :ack_floor, :num_ack_pending, :num_redelivered, :num_waiting, :num_pending, :cluster, :push_bound, keyword_init: true) do def initialize(opts={}) opts[:created] = Time.parse(opts[:created]) opts[:ack_floor] = SequenceInfo.new(opts[:ack_floor]) opts[:delivered] = SequenceInfo.new(opts[:delivered]) opts[:config][:ack_wait] = opts[:config][:ack_wait] / ::NATS::NANOSECONDS opts[:config] = ConsumerConfig.new(opts[:config]) opts.delete(:cluster) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#stream_seq ⇒ Integer
Returns The stream sequence.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/nats/io/jetstream/api.rb', line 31 SequenceInfo = Struct.new(:consumer_seq, :stream_seq, :last_active, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields and freeze. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) freeze end end |
#subjects ⇒ Array
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |
#type ⇒ String
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/nats/io/jetstream/api.rb', line 172 StreamConfig = Struct.new( :name, :description, :subjects, :retention, :max_consumers, :max_msgs, :max_bytes, :discard, :max_age, :max_msgs_per_subject, :max_msg_size, :storage, :num_replicas, :no_ack, :duplicate_window, :placement, :mirror, :sources, :sealed, :deny_delete, :deny_purge, :allow_rollup_hdrs, :republish, :allow_direct, :mirror_direct, :metadata, keyword_init: true) do def initialize(opts={}) # Filter unrecognized fields just in case. rem = opts.keys - members opts.delete_if { |k| rem.include?(k) } super(opts) end end |