Class: LaunchDarkly::Impl::Integrations::Consul::ConsulFeatureStoreCore
- Inherits:
-
Object
- Object
- LaunchDarkly::Impl::Integrations::Consul::ConsulFeatureStoreCore
- Defined in:
- lib/ldclient-rb/impl/integrations/consul_impl.rb
Overview
Internal implementation of the Consul feature store, intended to be used with CachingStoreWrapper.
Instance Method Summary collapse
- #available? ⇒ Boolean
- #get_all_internal(kind) ⇒ Object
- #get_internal(kind, key) ⇒ Object
- #init_internal(all_data) ⇒ Object
-
#initialize(opts) ⇒ ConsulFeatureStoreCore
constructor
A new instance of ConsulFeatureStoreCore.
- #initialized_internal? ⇒ Boolean
- #stop ⇒ Object
- #upsert_internal(kind, new_item) ⇒ Object
Constructor Details
#initialize(opts) ⇒ ConsulFeatureStoreCore
Returns a new instance of ConsulFeatureStoreCore.
18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 18 def initialize(opts) unless CONSUL_ENABLED raise RuntimeError.new("can't use Consul feature store without the 'diplomat' gem") end @prefix = (opts[:prefix] || LaunchDarkly::Integrations::Consul.default_prefix) + '/' @logger = opts[:logger] || Config.default_logger Diplomat.configuration = opts[:consul_config] unless opts[:consul_config].nil? Diplomat.configuration.url = opts[:url] unless opts[:url].nil? @logger.info("ConsulFeatureStore: using Consul host at #{Diplomat.configuration.url}") end |
Instance Method Details
#available? ⇒ Boolean
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 122 def available? # Most implementations use the initialized_internal? method as a # proxy for this check. However, since `initialized_internal?` # catches a KeyNotFound exception, and that exception can be raised # when the server goes away, we have to modify our behavior # slightly. Diplomat::Kv.get(inited_key, {}, :return, :return) true rescue false end |
#get_all_internal(kind) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 68 def get_all_internal(kind) items_out = {} results = Diplomat::Kv.get(kind_key(kind), { recurse: true }, :return) (results == "" ? [] : results).each do |result| value = result[:value] unless value.nil? item = Model.deserialize(kind, value) items_out[item[:key].to_sym] = item end end items_out end |
#get_internal(kind, key) ⇒ Object
63 64 65 66 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 63 def get_internal(kind, key) value = Diplomat::Kv.get(item_key(kind, key), {}, :return) # :return means "don't throw an error if not found" (value.nil? || value == "") ? nil : Model.deserialize(kind, value) end |
#init_internal(all_data) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 30 def init_internal(all_data) # Start by reading the existing keys; we will later delete any of these that weren't in all_data. unused_old_keys = Set.new keys = Diplomat::Kv.get(@prefix, { keys: true, recurse: true }, :return) unused_old_keys.merge(keys) if keys != "" ops = [] num_items = 0 # Insert or update every provided item all_data.each do |kind, items| items.values.each do |item| value = Model.serialize(kind, item) key = item_key(kind, item[:key]) ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => key, 'Value' => value } }) unused_old_keys.delete(key) num_items = num_items + 1 end end # Now delete any previously existing items whose keys were not in the current data unused_old_keys.each do |key| ops.push({ 'KV' => { 'Verb' => 'delete', 'Key' => key } }) end # Now set the special key that we check in initialized_internal? ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => inited_key, 'Value' => '' } }) ConsulUtil.batch_operations(ops) @logger.info { "Initialized database with #{num_items} items" } end |
#initialized_internal? ⇒ Boolean
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 111 def initialized_internal? # Unfortunately we need to use exceptions here, instead of the :return parameter, because with # :return there's no way to distinguish between a missing value and an empty string. begin Diplomat::Kv.get(inited_key, {}) true rescue Diplomat::KeyNotFound false end end |
#stop ⇒ Object
134 135 136 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 134 def stop # There's no Consul client instance to dispose of end |
#upsert_internal(kind, new_item) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 81 def upsert_internal(kind, new_item) key = item_key(kind, new_item[:key]) json = Model.serialize(kind, new_item) # We will potentially keep retrying indefinitely until someone's write succeeds while true old_value = Diplomat::Kv.get(key, { decode_values: true }, :return) if old_value.nil? || old_value == "" mod_index = 0 else old_item = Model.deserialize(kind, old_value[0]["Value"]) # Check whether the item is stale. If so, don't do the update (and return the existing item to # FeatureStoreWrapper so it can be cached) if old_item[:version] >= new_item[:version] return old_item end mod_index = old_value[0]["ModifyIndex"] end # Otherwise, try to write. We will do a compare-and-set operation, so the write will only succeed if # the key's ModifyIndex is still equal to the previous value. If the previous ModifyIndex was zero, # it means the key did not previously exist and the write will only succeed if it still doesn't exist. success = Diplomat::Kv.put(key, json, cas: mod_index) return new_item if success # If we failed, retry the whole shebang @logger.debug { "Concurrent modification detected, retrying" } end end |