Class: NATS::KeyValue
- Inherits:
-
Object
- Object
- NATS::KeyValue
- Defined in:
- lib/nats/io/kv.rb,
lib/nats/io/kv/api.rb,
lib/nats/io/kv/errors.rb,
lib/nats/io/kv/manager.rb,
lib/nats/io/kv/bucket_status.rb
Defined Under Namespace
Modules: API, Manager Classes: BadBucketError, BucketNotFoundError, BucketStatus, Entry, Error, KeyDeletedError, KeyNotFoundError, KeyWrongLastSequenceError
Constant Summary collapse
- KV_OP =
"KV-Operation"
- KV_DEL =
"DEL"
- KV_PURGE =
"PURGE"
- MSG_ROLLUP_SUBJECT =
"sub"
- MSG_ROLLUP_ALL =
"all"
- ROLLUP =
"Nats-Rollup"
- EXPECTED_LAST_SUBJECT_SEQUENCE =
"Nats-Expected-Last-Subject-Sequence"
Instance Method Summary collapse
-
#create(key, value) ⇒ Object
create will add the key/value pair iff it does not exist.
-
#delete(key, params = {}) ⇒ Object
delete will place a delete marker and remove all previous revisions.
-
#get(key, params = {}) ⇒ Object
get returns the latest value for the key.
-
#initialize(opts = {}) ⇒ KeyValue
constructor
A new instance of KeyValue.
-
#purge(key) ⇒ Object
purge will remove the key and all revisions.
-
#put(key, value) ⇒ Object
put will place the new value for the key into the store and return the revision number.
-
#status ⇒ Object
status retrieves the status and configuration of a bucket.
-
#update(key, value, params = {}) ⇒ Object
update will update the value iff the latest revision matches.
Constructor Details
#initialize(opts = {}) ⇒ KeyValue
Returns a new instance of KeyValue.
29 30 31 32 33 34 35 |
# File 'lib/nats/io/kv.rb', line 29 def initialize(opts={}) @name = opts[:name] @stream = opts[:stream] @pre = opts[:pre] @js = opts[:js] @direct = opts[:direct] end |
Instance Method Details
#create(key, value) ⇒ Object
create will add the key/value pair iff it does not exist.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/nats/io/kv.rb', line 94 def create(key, value) pa = nil begin pa = update(key, value, last: 0) rescue KeyWrongLastSequenceError => err # In case of attempting to recreate an already deleted key, # the client would get a KeyWrongLastSequenceError. When this happens, # it is needed to fetch latest revision number and attempt to update. begin # NOTE: This reimplements the following behavior from Go client. # # Since we have tombstones for DEL ops for watchers, this could be from that # so we need to double check. # _get(key) # No exception so not a deleted key, so reraise the original KeyWrongLastSequenceError. # If it was deleted then the error exception will contain metadata # to recreate using the last revision. raise err rescue KeyDeletedError => err pa = update(key, value, last: err.entry.revision) end end pa end |
#delete(key, params = {}) ⇒ Object
delete will place a delete marker and remove all previous revisions.
144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/nats/io/kv.rb', line 144 def delete(key, params={}) hdrs = {} hdrs[KV_OP] = KV_DEL last = (params[:last] ||= 0) if last > 0 hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s end ack = @js.publish("#{@pre}#{key}", header: hdrs) ack.seq end |
#get(key, params = {}) ⇒ Object
get returns the latest value for the key.
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/nats/io/kv.rb', line 38 def get(key, params={}) entry = nil begin entry = _get(key, params) rescue KeyDeletedError raise KeyNotFoundError end entry end |
#purge(key) ⇒ Object
purge will remove the key and all revisions.
157 158 159 160 161 162 |
# File 'lib/nats/io/kv.rb', line 157 def purge(key) hdrs = {} hdrs[KV_OP] = KV_PURGE hdrs[ROLLUP] = MSG_ROLLUP_SUBJECT @js.publish("#{@pre}#{key}", header: hdrs) end |
#put(key, value) ⇒ Object
put will place the new value for the key into the store and return the revision number.
88 89 90 91 |
# File 'lib/nats/io/kv.rb', line 88 def put(key, value) ack = @js.publish("#{@pre}#{key}", value) ack.seq end |
#status ⇒ Object
status retrieves the status and configuration of a bucket.
165 166 167 168 |
# File 'lib/nats/io/kv.rb', line 165 def status info = @js.stream_info(@stream) BucketStatus.new(info, @name) end |
#update(key, value, params = {}) ⇒ Object
update will update the value iff the latest revision matches.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/nats/io/kv.rb', line 125 def update(key, value, params={}) hdrs = {} last = (params[:last] ||= 0) hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s ack = nil begin ack = @js.publish("#{@pre}#{key}", value, header: hdrs) rescue NATS::JetStream::Error::APIError => err if err.err_code == 10071 raise KeyWrongLastSequenceError.new(err.description) else raise err end end ack.seq end |