Class: Etcd::Client

Inherits:
Object
  • Object
show all
Includes:
Constants, Loggable, Requestable
Defined in:
lib/etcd/client.rb,
lib/etcd/client/failover.rb,
lib/etcd/client/protocol.rb,
lib/etcd/client/observing.rb

Constant Summary

Constants included from Constants

Etcd::Constants::S_ACTION, Etcd::Constants::S_AND, Etcd::Constants::S_DIR, Etcd::Constants::S_EXPIRATION, Etcd::Constants::S_INDEX, Etcd::Constants::S_KEY, Etcd::Constants::S_KEYS, Etcd::Constants::S_LOCATION, Etcd::Constants::S_NEW_KEY, Etcd::Constants::S_PREV_VALUE, Etcd::Constants::S_SLASH, Etcd::Constants::S_TTL, Etcd::Constants::S_VALUE, Etcd::Constants::S_WATCH

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#logger, #reset_logger!

Methods included from Requestable

#http_client, #request, #reset_http_client!

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :uris (Array) — default: ['http://127.0.0.1:4001']

    seed uris with etcd cluster nodes

  • :heartbeat_freq (Float) — default: 0.0

    check-frequency for leader status (in seconds) Heartbeating will start only for non-zero values



9
10
11
12
13
14
# File 'lib/etcd/client/failover.rb', line 9

def initialize(options={})
  @observers      = {}
  @seed_uris      = options[:uris] || ['http://127.0.0.1:4001']
  @heartbeat_freq = options[:heartbeat_freq].to_f
  http_client.redirect_uri_callback = method(:handle_redirected)
end

Instance Attribute Details

#clusterObject

Returns the value of attribute cluster.



9
10
11
# File 'lib/etcd/client.rb', line 9

def cluster
  @cluster
end

#heartbeat_freqObject

Returns the value of attribute heartbeat_freq.



12
13
14
# File 'lib/etcd/client.rb', line 12

def heartbeat_freq
  @heartbeat_freq
end

#leaderObject

kinda magic accessor-method:

  • will reinitialize leader && cluster if needed



57
58
59
# File 'lib/etcd/client/failover.rb', line 57

def leader
  @leader
end

#observersObject

Returns the value of attribute observers.



13
14
15
# File 'lib/etcd/client.rb', line 13

def observers
  @observers
end

#seed_urisObject

Returns the value of attribute seed_uris.



11
12
13
# File 'lib/etcd/client.rb', line 11

def seed_uris
  @seed_uris
end

#statusObject

:up/:down



14
15
16
# File 'lib/etcd/client.rb', line 14

def status
  @status
end

Class Method Details

.connect(options = {}) ⇒ Object

Create a new client and connect it to the etcd cluster.

This method is the preferred way to create a new client, and is the equivalent of ‘Client.new(options).connect`. See #initialize and #connect for options and details.

See Also:



24
25
26
# File 'lib/etcd/client/failover.rb', line 24

def self.connect(options={})
  self.new(options).connect
end

Instance Method Details

#connectObject

Connects to the etcd cluster

See Also:



31
32
33
34
35
# File 'lib/etcd/client/failover.rb', line 31

def connect
  update_cluster
  start_heartbeat_if_needed
  self
end

#delete(key) ⇒ String

Remove a key and its value.

The previous value is returned, or ‘nil` if the key did not exist.

Parameters:

  • key (String)

    the key to remove

Returns:

  • (String)

    the previous value, if any



71
72
73
74
75
# File 'lib/etcd/client/protocol.rb', line 71

def delete(key)
  data = request_data(:delete, key_uri(key))
  return nil unless data
  data[S_PREV_VALUE]
end

#exists?(key) ⇒ true, false

Returns true if the specified key exists.

This is a convenience method and equivalent to calling #get and checking if the value is ‘nil`.

Returns:

  • (true, false)

    whether or not the specified key exists



83
84
85
# File 'lib/etcd/client/protocol.rb', line 83

def exists?(key)
  !!get(key)
end

#get(key) ⇒ String, Hash

Gets the value or values for a key.

If the key represents a directory with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and values will be returned.

Parameters:

  • key (String)

    the key or prefix to retrieve

Returns:

  • (String, Hash)

    the value for the key, or a hash of keys and values when the key is a prefix.



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/etcd/client/protocol.rb', line 30

def get(key)
  data = request_data(:get, key_uri(key))
  return nil unless data
  if data.is_a?(Array)
    data.each_with_object({}) do |e, acc|
      acc[e[S_KEY]] = e[S_VALUE]
    end
  else
    data[S_VALUE]
  end
end

#handle_redirected(uri, response) ⇒ Object

Only happens on attempted write to a follower node in cluster. Means:

  • leader changed since last update

Solution: just get fresh cluster status



75
76
77
78
# File 'lib/etcd/client/failover.rb', line 75

def handle_redirected(uri, response)
  update_cluster
  http_client.default_redirect_uri_callback(uri, response)
end

#info(key) ⇒ Hash

Returns info about a key, such as TTL, expiration and index.

For keys with values the returned hash will include ‘:key`, `:value` and `:index`. Additionally for keys with a TTL set there will be a `:ttl` and `:expiration` (as a UTC `Time`).

For keys that represent directories with no direct decendants (e.g. “/foo” for “/foo/bar/baz”) the ‘:dir` key will have the value `true`.

For keys that represent directories with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and info will be returned.

Parameters:

  • key (String)

    the key or prefix to retrieve

Returns:

  • (Hash)

    a with info about the key, the exact contents depend on what kind of key it is.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/etcd/client/protocol.rb', line 102

def info(key)
  data = request_data(:get, uri(key))
  return nil unless data
  if data.is_a?(Array)
    data.each_with_object({}) do |d, acc|
      info = extract_info(d)
      info.delete(:action)
      acc[info[:key]] = info
    end
  else
    info = extract_info(data)
    info.delete(:action)
    info
  end
end

#inspectObject



16
17
18
# File 'lib/etcd/client.rb', line 16

def inspect
  %Q(<Etcd::Client #{seed_uris}>)
end

#key_uri(key) ⇒ Object



155
156
157
# File 'lib/etcd/client/protocol.rb', line 155

def key_uri(key)
  uri(key, S_KEYS)
end

#leader_uriObject



61
62
63
# File 'lib/etcd/client/failover.rb', line 61

def leader_uri
  leader && leader.etcd
end

#observe(prefix, &handler) ⇒ #cancel, #join

Sets up a continuous watch of a key or prefix.

This method works like #watch (which is used behind the scenes), but will re-watch the key or prefix after receiving a change notificiation.

When re-watching the index of the previous change notification is used, so no subsequent changes will be lost while a change is being processed.

Unlike #watch this method as asynchronous. The watch handler runs in a separate thread (currently a new thread is created for each invocation, keep this in mind if you need to watch many different keys), and can be cancelled by calling ‘#cancel` on the returned object.

Because of implementation details the watch handler thread will not be stopped directly when you call ‘#cancel`. The thread will be blocked until the next change notification (which will be ignored). This will have very little effect on performance since the thread will not be runnable. Unless you’re creating lots of observers it should not matter. If you want to make sure you wait for the thread to stop you can call ‘#join` on the returned object.

Examples:

Creating and cancelling an observer

observer = client.observe('/foo') do |value|
  # do something on changes
end
# ...
observer.cancel

Returns:

  • (#cancel, #join)

    an observer object which you can call cancel and join on



34
35
36
37
38
# File 'lib/etcd/client/observing.rb', line 34

def observe(prefix, &handler)
  ob = Observer.new(self, prefix, handler).tap(&:run)
  @observers[prefix] = ob
  ob
end

#observers_overviewObject



40
41
42
43
44
# File 'lib/etcd/client/observing.rb', line 40

def observers_overview
  observers.map do |_, observer|
    observer.pp_status
  end
end

#refresh_observersObject

Re-initiates watches after leader election



52
53
54
55
56
57
# File 'lib/etcd/client/observing.rb', line 52

def refresh_observers
  logger.debug("refresh_observers: enter")
  observers.each do |_, observer|
    observer.rerun unless observer.status
  end
end

#refresh_observers_if_neededObject



46
47
48
# File 'lib/etcd/client/observing.rb', line 46

def refresh_observers_if_needed
  refresh_observers if observers.values.any?{|x| not x.status}
end

#set(key, value, options = {}) ⇒ String

Sets the value of a key.

Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.

Parameters:

  • key (String)

    the key to set

  • value (String)

    the value to set

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :ttl (Fixnum) — default: nil

    an optional time to live (in seconds) for the key

Returns:

  • (String)

    The previous value (if any)



15
16
17
18
19
20
# File 'lib/etcd/client/protocol.rb', line 15

def set(key, value, options={})
  body       = {:value => value}
  body[:ttl] = options[:ttl] if options[:ttl]
  data       = request_data(:post, key_uri(key), body: body)
  data[S_PREV_VALUE]
end

#start_heartbeat_if_neededObject



66
67
68
69
70
# File 'lib/etcd/client/failover.rb', line 66

def start_heartbeat_if_needed
  logger.debug("client - starting heartbeat")
  @heartbeat = Etcd::Heartbeat.new(self, @heartbeat_freq)
  @heartbeat.start_heartbeat_if_needed
end

#update(key, value, expected_value, options = {}) ⇒ true, false

Atomically sets the value for a key if the current value for the key matches the specified expected value.

Returns ‘true` when the operation succeeds, i.e. when the specified expected value matches the current value. Returns `false` otherwise.

Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.

Parameters:

  • key (String)

    the key to set

  • value (String)

    the value to set

  • expected_value (String)

    the value to compare to the current value

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :ttl (Fixnum) — default: nil

    an optional time to live (in seconds) for the key

Returns:

  • (true, false)

    whether or not the operation succeeded



58
59
60
61
62
63
# File 'lib/etcd/client/protocol.rb', line 58

def update(key, value, expected_value, options={})
  body       = {:value => value, :prevValue => expected_value}
  body[:ttl] = options[:ttl] if options[:ttl]
  data       = request_data(:post, key_uri(key), body: body)
  !! data
end

#update_clusterObject

Creates a Cluster-instance from ‘@seed_uris` and stores the cluster leader information



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/etcd/client/failover.rb', line 40

def update_cluster
  logger.debug("update_cluster: enter")
  begin
    @cluster = Etcd::Cluster.init_from_uris(*seed_uris)
    @leader  = @cluster.leader
    @status  = :up
    logger.debug("update_cluster: after success")
    refresh_observers
    @cluster
  rescue AllNodesDownError => e
    logger.debug("update_cluster: failed")
    raise e
  end
end

#watch(prefix, options = {}) {|value, key, info| ... } ⇒ Object

Watches a key or prefix and calls the given block when with any changes.

This method will block until the server replies. There is no way to cancel the call.

The parameters to the block are the value, the key and a hash of additional info. The info will contain the ‘:action` that caused the change (`:set`, `:delete` etc.), the `:key`, the `:value`, the `:index`, `:new_key` with the value `true` when a new key was created below the watched prefix, `:previous_value`, if any, `:ttl` and `:expiration` if applicable.

The reason why the block parameters are in the order`value`, ‘key` instead of `key`, `value` is because you almost always want to get the new value when you watch, but not always the key, and most often not the info. With this order you can leave out the parameters you don’t need.

Parameters:

  • prefix (String)

    the key or prefix to watch

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :index (Fixnum) — default: nil

    the index to start watching from

Yield Parameters:

  • value (String)

    the value of the key that changed

  • key (String)

    the key that changed

  • info (Hash)

    the info for the key that changed

Returns:

  • (Object)

    the result of the given block



142
143
144
145
146
147
148
149
150
151
152
# File 'lib/etcd/client/protocol.rb', line 142

def watch(prefix, options={})
  if options[:index]
    parameters = {:index => options[:index]}
    data       = request_data(:post, watch_uri(prefix), query: parameters)
  else
    data       = request_data(:get, watch_uri(prefix), query: {})
  end

  info         = extract_info(data)
  yield info[:value], info[:key], info
end

#watch_uri(key) ⇒ Object



159
160
161
# File 'lib/etcd/client/protocol.rb', line 159

def watch_uri(key)
  uri(key, S_WATCH)
end