Class: Etcd::Leader

Inherits:
Object
  • Object
show all
Defined in:
lib/etcd/leader.rb,
lib/etcd/leader/version.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service_key, opts = {}) ⇒ Leader

Returns a new instance of Leader.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/etcd/leader.rb', line 8

def initialize(service_key, opts={})
  # Unique service keys are necessary for avoiding clashes between different services in the etcd cluster
  @service_key = service_key
  @etcd = opts[:etcd] || Etcd.client
  @node_name = opts[:name] || SecureRandom.uuid
  @ttl = opts[:ttl] || 10

  # Local Threads of execution
  @main_loop = nil
  @refresh_loop = nil

  # Some basic attributes
  @started = false
  @is_leader = false
  @current_leader = nil
  @membership_key = nil
  @modified_index = nil
end

Instance Attribute Details

#current_leaderObject (readonly)

Returns the value of attribute current_leader.



7
8
9
# File 'lib/etcd/leader.rb', line 7

def current_leader
  @current_leader
end

#is_leaderObject (readonly)

Returns the value of attribute is_leader.



7
8
9
# File 'lib/etcd/leader.rb', line 7

def is_leader
  @is_leader
end

#node_nameObject (readonly)

Returns the value of attribute node_name.



7
8
9
# File 'lib/etcd/leader.rb', line 7

def node_name
  @node_name
end

#startedObject (readonly)

Returns the value of attribute started.



7
8
9
# File 'lib/etcd/leader.rb', line 7

def started
  @started
end

Class Method Details

.versionObject



3
4
5
# File 'lib/etcd/leader/version.rb', line 3

def self.version
  "0.1.0"
end

Instance Method Details

#check_leaderObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/etcd/leader.rb', line 73

def check_leader
  res = @etcd.get(@service_key,sorted:true)
  nodes = res.node.children
  if @current_leader != nodes.first.value
    @current_leader = nodes.first.value
    handle_new_leader
  end
  if @membership_key == nodes.first.key
    @is_leader = true
    handle_self_elected
    res =  @etcd.watch(@membership_key)
    if res.node.value != @node_name
      stop
      start
    end
  else
    index = nodes.index{|i| i.key == @membership_key}
    @etcd.watch(nodes[index-1].key)
  end
end

#handle_new_leaderObject



60
61
62
# File 'lib/etcd/leader.rb', line 60

def handle_new_leader
  @on_new_leader.call(@node_name, @is_leader, @current_leader) if @on_new_leader
end

#handle_self_electedObject



69
70
71
# File 'lib/etcd/leader.rb', line 69

def handle_self_elected
  @on_elected.call(@node_name, @is_leader, @current_leader) if @on_elected
end

#on_new_leader(&block) ⇒ Object



56
57
58
# File 'lib/etcd/leader.rb', line 56

def on_new_leader(&block)
  @on_new_leader = block
end

#on_self_elected(&block) ⇒ Object



64
65
66
67
# File 'lib/etcd/leader.rb', line 64

def on_self_elected(&block)
  @on_elected = block
  self
end

#refreshObject



94
95
96
97
98
99
100
# File 'lib/etcd/leader.rb', line 94

def refresh
  loop do
    sleep @ttl/2
    res = @etcd.set(@membership_key, value:@node_name, ttl:@ttl, prevIndex:@modified_index)
    @modified_index = res.node.modified_index
  end
end

#setupObject



48
49
50
51
52
53
54
# File 'lib/etcd/leader.rb', line 48

def setup
  res = @etcd.create_in_order(@service_key,value:@node_name,ttl:@ttl)
  @membership_key = res.node.key
  @modified_index = res.node.modified_index
  @main_loop = Thread.new{ loop{check_leader}}
  @refresh_thread = Thread.new{ refresh }
end

#startObject



27
28
29
30
31
# File 'lib/etcd/leader.rb', line 27

def start
  return if @started
  @started = true
  setup
end

#stopObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/etcd/leader.rb', line 33

def stop
  return if !@started
  puts "Stopping #{@node_name}"
  @started = false
  @refresh_thread.kill
  @main_loop.kill
  @main_loop = nil
  @refresh_thread = nil
  @etcd.delete(@membership_key)
  @membership_key = nil
  @modified_index = nil
  @is_leader = false
  @current_leader = nil
end