Class: NATS::KeyValue

Inherits:
Object
  • Object
show all
Defined in:
lib/nats/io/kv.rb,
lib/nats/io/kv/api.rb,
lib/nats/io/kv/errors.rb,
lib/nats/io/kv/manager.rb,
lib/nats/io/kv/bucket_status.rb

Defined Under Namespace

Modules: API, Manager Classes: BadBucketError, BucketNotFoundError, BucketStatus, Entry, Error, KeyDeletedError, KeyNotFoundError, KeyWrongLastSequenceError

Constant Summary collapse

KV_OP =
"KV-Operation"
KV_DEL =
"DEL"
KV_PURGE =
"PURGE"
MSG_ROLLUP_SUBJECT =
"sub"
MSG_ROLLUP_ALL =
"all"
ROLLUP =
"Nats-Rollup"
EXPECTED_LAST_SUBJECT_SEQUENCE =
"Nats-Expected-Last-Subject-Sequence"

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ KeyValue

Returns a new instance of KeyValue.



29
30
31
32
33
34
35
# File 'lib/nats/io/kv.rb', line 29

def initialize(opts={})
  @name = opts[:name]
  @stream = opts[:stream]
  @pre = opts[:pre]
  @js = opts[:js]
  @direct = opts[:direct]
end

Instance Method Details

#create(key, value) ⇒ Object

create will add the key/value pair iff it does not exist.



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/nats/io/kv.rb', line 94

def create(key, value)
  pa = nil
  begin
    pa = update(key, value, last: 0)
  rescue KeyWrongLastSequenceError => err
    # In case of attempting to recreate an already deleted key,
    # the client would get a KeyWrongLastSequenceError.  When this happens,
    # it is needed to fetch latest revision number and attempt to update.
    begin
      # NOTE: This reimplements the following behavior from Go client.
      #
      #   Since we have tombstones for DEL ops for watchers, this could be from that
      #   so we need to double check.
      #
      _get(key)

      # No exception so not a deleted key, so reraise the original KeyWrongLastSequenceError.
      # If it was deleted then the error exception will contain metadata
      # to recreate using the last revision.
      raise err
    rescue KeyDeletedError => err
      pa = update(key, value, last: err.entry.revision)
    end
  end

  pa
end

#delete(key, params = {}) ⇒ Object

delete will place a delete marker and remove all previous revisions.



144
145
146
147
148
149
150
151
152
153
154
# File 'lib/nats/io/kv.rb', line 144

def delete(key, params={})
  hdrs = {}
  hdrs[KV_OP] = KV_DEL
  last = (params[:last] ||= 0)
  if last > 0
    hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s
  end
  ack = @js.publish("#{@pre}#{key}", header: hdrs)

  ack.seq
end

#get(key, params = {}) ⇒ Object

get returns the latest value for the key.



38
39
40
41
42
43
44
45
46
47
# File 'lib/nats/io/kv.rb', line 38

def get(key, params={})
  entry = nil
  begin
    entry = _get(key, params)
  rescue KeyDeletedError
    raise KeyNotFoundError
  end

  entry
end

#purge(key) ⇒ Object

purge will remove the key and all revisions.



157
158
159
160
161
162
# File 'lib/nats/io/kv.rb', line 157

def purge(key)
  hdrs = {}
  hdrs[KV_OP] = KV_PURGE
  hdrs[ROLLUP] = MSG_ROLLUP_SUBJECT
  @js.publish("#{@pre}#{key}", header: hdrs)
end

#put(key, value) ⇒ Object

put will place the new value for the key into the store and return the revision number.



88
89
90
91
# File 'lib/nats/io/kv.rb', line 88

def put(key, value)
  ack = @js.publish("#{@pre}#{key}", value)
  ack.seq
end

#statusObject

status retrieves the status and configuration of a bucket.



165
166
167
168
# File 'lib/nats/io/kv.rb', line 165

def status
  info = @js.stream_info(@stream)
  BucketStatus.new(info, @name)
end

#update(key, value, params = {}) ⇒ Object

update will update the value iff the latest revision matches.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/nats/io/kv.rb', line 125

def update(key, value, params={})
  hdrs = {}
  last = (params[:last] ||= 0)
  hdrs[EXPECTED_LAST_SUBJECT_SEQUENCE] = last.to_s
  ack = nil
  begin
    ack = @js.publish("#{@pre}#{key}", value, header: hdrs)
  rescue NATS::JetStream::Error::APIError => err
    if err.err_code == 10071
      raise KeyWrongLastSequenceError.new(err.description)
    else
      raise err
    end
  end

  ack.seq
end