Class: Etcd::Client
- Inherits:
-
Object
- Object
- Etcd::Client
- Includes:
- Constants, Loggable, Requestable
- Defined in:
- lib/etcd/client.rb,
lib/etcd/client/failover.rb,
lib/etcd/client/protocol.rb,
lib/etcd/client/observing.rb
Constant Summary
Constants included from Constants
Etcd::Constants::S_ACTION, Etcd::Constants::S_AND, Etcd::Constants::S_DIR, Etcd::Constants::S_EXPIRATION, Etcd::Constants::S_INDEX, Etcd::Constants::S_KEY, Etcd::Constants::S_KEYS, Etcd::Constants::S_LOCATION, Etcd::Constants::S_NEW_KEY, Etcd::Constants::S_PREV_VALUE, Etcd::Constants::S_SLASH, Etcd::Constants::S_TTL, Etcd::Constants::S_VALUE, Etcd::Constants::S_WATCH
Instance Attribute Summary collapse
-
#cluster ⇒ Object
Returns the value of attribute cluster.
-
#heartbeat_freq ⇒ Object
Returns the value of attribute heartbeat_freq.
-
#leader ⇒ Object
kinda magic accessor-method: - will reinitialize leader && cluster if needed.
-
#observers ⇒ Object
Returns the value of attribute observers.
-
#seed_uris ⇒ Object
Returns the value of attribute seed_uris.
-
#status ⇒ Object
:up/:down.
Class Method Summary collapse
-
.connect(options = {}) ⇒ Object
Create a new client and connect it to the etcd cluster.
Instance Method Summary collapse
-
#connect ⇒ Object
Connects to the etcd cluster.
-
#delete(key) ⇒ String
Remove a key and its value.
-
#exists?(key) ⇒ true, false
Returns true if the specified key exists.
-
#get(key) ⇒ String, Hash
Gets the value or values for a key.
-
#handle_redirected(uri, response) ⇒ Object
Only happens on attempted write to a follower node in cluster.
-
#info(key) ⇒ Hash
Returns info about a key, such as TTL, expiration and index.
-
#initialize(options = {}) ⇒ Client
constructor
A new instance of Client.
- #inspect ⇒ Object
- #key_uri(key) ⇒ Object
- #leader_uri ⇒ Object
-
#observe(prefix, &handler) ⇒ #cancel, #join
Sets up a continuous watch of a key or prefix.
- #observers_overview ⇒ Object
-
#refresh_observers ⇒ Object
Re-initiates watches after leader election.
- #refresh_observers_if_needed ⇒ Object
-
#set(key, value, options = {}) ⇒ String
Sets the value of a key.
- #start_heartbeat_if_needed ⇒ Object
-
#update(key, value, expected_value, options = {}) ⇒ true, false
Atomically sets the value for a key if the current value for the key matches the specified expected value.
-
#update_cluster ⇒ Object
Creates a Cluster-instance from ‘@seed_uris` and stores the cluster leader information.
-
#watch(prefix, options = {}) {|value, key, info| ... } ⇒ Object
Watches a key or prefix and calls the given block when with any changes.
- #watch_uri(key) ⇒ Object
Methods included from Loggable
Methods included from Requestable
#http_client, #request, #reset_http_client!
Constructor Details
#initialize(options = {}) ⇒ Client
Returns a new instance of Client.
9 10 11 12 13 14 |
# File 'lib/etcd/client/failover.rb', line 9 def initialize(={}) @observers = {} @seed_uris = [:uris] || ['http://127.0.0.1:4001'] @heartbeat_freq = [:heartbeat_freq].to_f http_client.redirect_uri_callback = method(:handle_redirected) end |
Instance Attribute Details
#cluster ⇒ Object
Returns the value of attribute cluster.
9 10 11 |
# File 'lib/etcd/client.rb', line 9 def cluster @cluster end |
#heartbeat_freq ⇒ Object
Returns the value of attribute heartbeat_freq.
12 13 14 |
# File 'lib/etcd/client.rb', line 12 def heartbeat_freq @heartbeat_freq end |
#leader ⇒ Object
kinda magic accessor-method:
-
will reinitialize leader && cluster if needed
57 58 59 |
# File 'lib/etcd/client/failover.rb', line 57 def leader @leader end |
#observers ⇒ Object
Returns the value of attribute observers.
13 14 15 |
# File 'lib/etcd/client.rb', line 13 def observers @observers end |
#seed_uris ⇒ Object
Returns the value of attribute seed_uris.
11 12 13 |
# File 'lib/etcd/client.rb', line 11 def seed_uris @seed_uris end |
#status ⇒ Object
:up/:down
14 15 16 |
# File 'lib/etcd/client.rb', line 14 def status @status end |
Class Method Details
.connect(options = {}) ⇒ Object
Create a new client and connect it to the etcd cluster.
This method is the preferred way to create a new client, and is the equivalent of ‘Client.new(options).connect`. See #initialize and #connect for options and details.
24 25 26 |
# File 'lib/etcd/client/failover.rb', line 24 def self.connect(={}) self.new().connect end |
Instance Method Details
#connect ⇒ Object
Connects to the etcd cluster
31 32 33 34 35 |
# File 'lib/etcd/client/failover.rb', line 31 def connect update_cluster start_heartbeat_if_needed self end |
#delete(key) ⇒ String
Remove a key and its value.
The previous value is returned, or ‘nil` if the key did not exist.
71 72 73 74 75 |
# File 'lib/etcd/client/protocol.rb', line 71 def delete(key) data = request_data(:delete, key_uri(key)) return nil unless data data[S_PREV_VALUE] end |
#exists?(key) ⇒ true, false
Returns true if the specified key exists.
This is a convenience method and equivalent to calling #get and checking if the value is ‘nil`.
83 84 85 |
# File 'lib/etcd/client/protocol.rb', line 83 def exists?(key) !!get(key) end |
#get(key) ⇒ String, Hash
Gets the value or values for a key.
If the key represents a directory with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and values will be returned.
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/etcd/client/protocol.rb', line 30 def get(key) data = request_data(:get, key_uri(key)) return nil unless data if data.is_a?(Array) data.each_with_object({}) do |e, acc| acc[e[S_KEY]] = e[S_VALUE] end else data[S_VALUE] end end |
#handle_redirected(uri, response) ⇒ Object
Only happens on attempted write to a follower node in cluster. Means:
-
leader changed since last update
Solution: just get fresh cluster status
75 76 77 78 |
# File 'lib/etcd/client/failover.rb', line 75 def handle_redirected(uri, response) update_cluster http_client.default_redirect_uri_callback(uri, response) end |
#info(key) ⇒ Hash
Returns info about a key, such as TTL, expiration and index.
For keys with values the returned hash will include ‘:key`, `:value` and `:index`. Additionally for keys with a TTL set there will be a `:ttl` and `:expiration` (as a UTC `Time`).
For keys that represent directories with no direct decendants (e.g. “/foo” for “/foo/bar/baz”) the ‘:dir` key will have the value `true`.
For keys that represent directories with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and info will be returned.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/etcd/client/protocol.rb', line 102 def info(key) data = request_data(:get, uri(key)) return nil unless data if data.is_a?(Array) data.each_with_object({}) do |d, acc| info = extract_info(d) info.delete(:action) acc[info[:key]] = info end else info = extract_info(data) info.delete(:action) info end end |
#inspect ⇒ Object
16 17 18 |
# File 'lib/etcd/client.rb', line 16 def inspect %Q(<Etcd::Client #{seed_uris}>) end |
#key_uri(key) ⇒ Object
155 156 157 |
# File 'lib/etcd/client/protocol.rb', line 155 def key_uri(key) uri(key, S_KEYS) end |
#leader_uri ⇒ Object
61 62 63 |
# File 'lib/etcd/client/failover.rb', line 61 def leader_uri leader && leader.etcd end |
#observe(prefix, &handler) ⇒ #cancel, #join
Sets up a continuous watch of a key or prefix.
This method works like #watch (which is used behind the scenes), but will re-watch the key or prefix after receiving a change notificiation.
When re-watching the index of the previous change notification is used, so no subsequent changes will be lost while a change is being processed.
Unlike #watch this method as asynchronous. The watch handler runs in a separate thread (currently a new thread is created for each invocation, keep this in mind if you need to watch many different keys), and can be cancelled by calling ‘#cancel` on the returned object.
Because of implementation details the watch handler thread will not be stopped directly when you call ‘#cancel`. The thread will be blocked until the next change notification (which will be ignored). This will have very little effect on performance since the thread will not be runnable. Unless you’re creating lots of observers it should not matter. If you want to make sure you wait for the thread to stop you can call ‘#join` on the returned object.
34 35 36 37 38 |
# File 'lib/etcd/client/observing.rb', line 34 def observe(prefix, &handler) ob = Observer.new(self, prefix, handler).tap(&:run) @observers[prefix] = ob ob end |
#observers_overview ⇒ Object
40 41 42 43 44 |
# File 'lib/etcd/client/observing.rb', line 40 def observers_overview observers.map do |_, observer| observer.pp_status end end |
#refresh_observers ⇒ Object
Re-initiates watches after leader election
52 53 54 55 56 57 |
# File 'lib/etcd/client/observing.rb', line 52 def refresh_observers logger.debug("refresh_observers: enter") observers.each do |_, observer| observer.rerun unless observer.status end end |
#refresh_observers_if_needed ⇒ Object
46 47 48 |
# File 'lib/etcd/client/observing.rb', line 46 def refresh_observers_if_needed refresh_observers if observers.values.any?{|x| not x.status} end |
#set(key, value, options = {}) ⇒ String
Sets the value of a key.
Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.
15 16 17 18 19 20 |
# File 'lib/etcd/client/protocol.rb', line 15 def set(key, value, ={}) body = {:value => value} body[:ttl] = [:ttl] if [:ttl] data = request_data(:post, key_uri(key), body: body) data[S_PREV_VALUE] end |
#start_heartbeat_if_needed ⇒ Object
66 67 68 69 70 |
# File 'lib/etcd/client/failover.rb', line 66 def start_heartbeat_if_needed logger.debug("client - starting heartbeat") @heartbeat = Etcd::Heartbeat.new(self, @heartbeat_freq) @heartbeat.start_heartbeat_if_needed end |
#update(key, value, expected_value, options = {}) ⇒ true, false
Atomically sets the value for a key if the current value for the key matches the specified expected value.
Returns ‘true` when the operation succeeds, i.e. when the specified expected value matches the current value. Returns `false` otherwise.
Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.
58 59 60 61 62 63 |
# File 'lib/etcd/client/protocol.rb', line 58 def update(key, value, expected_value, ={}) body = {:value => value, :prevValue => expected_value} body[:ttl] = [:ttl] if [:ttl] data = request_data(:post, key_uri(key), body: body) !! data end |
#update_cluster ⇒ Object
Creates a Cluster-instance from ‘@seed_uris` and stores the cluster leader information
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/etcd/client/failover.rb', line 40 def update_cluster logger.debug("update_cluster: enter") begin @cluster = Etcd::Cluster.init_from_uris(*seed_uris) @leader = @cluster.leader @status = :up logger.debug("update_cluster: after success") refresh_observers @cluster rescue AllNodesDownError => e logger.debug("update_cluster: failed") raise e end end |
#watch(prefix, options = {}) {|value, key, info| ... } ⇒ Object
Watches a key or prefix and calls the given block when with any changes.
This method will block until the server replies. There is no way to cancel the call.
The parameters to the block are the value, the key and a hash of additional info. The info will contain the ‘:action` that caused the change (`:set`, `:delete` etc.), the `:key`, the `:value`, the `:index`, `:new_key` with the value `true` when a new key was created below the watched prefix, `:previous_value`, if any, `:ttl` and `:expiration` if applicable.
The reason why the block parameters are in the order`value`, ‘key` instead of `key`, `value` is because you almost always want to get the new value when you watch, but not always the key, and most often not the info. With this order you can leave out the parameters you don’t need.
142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/etcd/client/protocol.rb', line 142 def watch(prefix, ={}) if [:index] parameters = {:index => [:index]} data = request_data(:post, watch_uri(prefix), query: parameters) else data = request_data(:get, watch_uri(prefix), query: {}) end info = extract_info(data) yield info[:value], info[:key], info end |
#watch_uri(key) ⇒ Object
159 160 161 |
# File 'lib/etcd/client/protocol.rb', line 159 def watch_uri(key) uri(key, S_WATCH) end |