38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/nats/io/kv/manager.rb', line 38
def create_key_value(config)
config = if not config.is_a?(KeyValue::API::KeyValueConfig)
KeyValue::API::KeyValueConfig.new(config)
else
config
end
config.history ||= 1
config.replicas ||= 1
duplicate_window = 2 * 60 if config.ttl
if config.ttl < duplicate_window
duplicate_window = config.ttl
end
config.ttl = config.ttl * ::NATS::NANOSECONDS
end
stream = JetStream::API::StreamConfig.new(
name: "KV_#{config.bucket}",
description: config.description,
subjects: ["$KV.#{config.bucket}.>"],
allow_direct: config.direct,
allow_rollup_hdrs: true,
deny_delete: true,
discard: "new",
duplicate_window: duplicate_window * ::NATS::NANOSECONDS,
max_age: config.ttl,
max_bytes: config.max_bytes,
max_consumers: -1,
max_msg_size: config.max_value_size,
max_msgs: -1,
max_msgs_per_subject: config.history,
num_replicas: config.replicas,
storage: config.storage,
republish: config.republish,
)
si = add_stream(stream)
KeyValue.new(
name: config.bucket,
stream: stream.name,
pre: "$KV.#{config.bucket}.",
js: self,
direct: si.config.allow_direct
)
end
|