Class: ZK::Election::Base

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/zk/election.rb

Direct Known Subclasses

Candidate, Observer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name, opts = {}) ⇒ Base

Returns a new instance of Base.



69
70
71
72
73
74
75
# File 'lib/zk/election.rb', line 69

def initialize(client, name, opts={})
  @zk = client
  @name = name
  opts = DEFAULT_OPTS.merge(opts)
  @root_election_node = opts[:root_election_node]
  @mutex = Monitor.new
end

Instance Attribute Details

#root_election_nodeObject (readonly)

Returns the value of attribute root_election_node.



67
68
69
# File 'lib/zk/election.rb', line 67

def root_election_node
  @root_election_node
end

#vote_pathObject (readonly)

Returns the value of attribute vote_path.



67
68
69
# File 'lib/zk/election.rb', line 67

def vote_path
  @vote_path
end

#zkObject (readonly)

Returns the value of attribute zk.



67
68
69
# File 'lib/zk/election.rb', line 67

def zk
  @zk
end

Instance Method Details

#cast_ballot!(data) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/zk/election.rb', line 89

def cast_ballot!(data)
  return if @vote_path
  create_root_path!
  @vote_path = @zk.create("#{root_vote_path}/#{VOTE_PREFIX}", data, :mode => :ephemeral_sequential)
rescue Exceptions::NoNode
  retry
end

#create_root_path!Object (protected)



146
147
148
# File 'lib/zk/election.rb', line 146

def create_root_path!
  @zk.mkdir_p(root_vote_path)
end

#digit(path) ⇒ Object (protected)



154
155
156
# File 'lib/zk/election.rb', line 154

def digit(path)
  path[/\d+$/].to_i
end

#leader_ack_pathObject

this znode will be created as an acknowledgement by the leader that it's aware of its status as the new leader and has run its procedures to become master



85
86
87
# File 'lib/zk/election.rb', line 85

def leader_ack_path
  @leader_ack_path ||= "#{root_vote_path}/leader_ack"
end

#leader_acked?(watch = false) ⇒ Boolean

has the leader acknowledged their role?

Returns:

  • (Boolean)


98
99
100
# File 'lib/zk/election.rb', line 98

def leader_acked?(watch=false)
  @zk.exists?(leader_ack_path, :watch => watch)
end

#leader_dataObject

return the data from the current leader or nil if there is no current leader



103
104
105
106
# File 'lib/zk/election.rb', line 103

def leader_data
  @zk.get(leader_ack_path).first
rescue Exceptions::NoNode
end

#on_leader_ack(&block) ⇒ Object

Asynchronously call the block when the leader has acknowledged its role.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/zk/election.rb', line 110

def on_leader_ack(&block)
  creation_sub = @zk.register(leader_ack_path, :only => [:created, :changed]) do |event|
    begin
      logger.debug { "in #{leader_ack_path} watcher, got creation event, notifying" }
      safe_call(block)
    ensure
      creation_sub.unregister
    end
  end

  deletion_sub = @zk.register(leader_ack_path, :only => [:deleted, :child]) do |event|
    if @zk.exists?(leader_ack_path, :watch => true)
      begin
        logger.debug { "in #{leader_ack_path} watcher, node created behind our back, notifying" }
        safe_call(block)
      ensure
        creation_sub.unregister
      end
    else
      logger.debug { "in #{leader_ack_path} watcher, got non-creation event, re-watching" }
    end
  end

  subs = [creation_sub, deletion_sub]

  if @zk.exists?(leader_ack_path, :watch => true)
    logger.debug { "on_leader_ack, #{leader_ack_path} exists, calling block" }
    begin
      safe_call(block)
    ensure
      subs.each { |s| s.unregister }
    end
  end
end

#root_vote_pathObject

holds the ephemeral nodes of this election



78
79
80
# File 'lib/zk/election.rb', line 78

def root_vote_path #:nodoc:
  @root_vote_path ||= "#{@root_election_node}/#{@name.gsub('/', '__')}"
end

#safe_call(*callbacks) ⇒ Object (protected)



158
159
160
161
162
163
164
165
166
167
# File 'lib/zk/election.rb', line 158

def safe_call(*callbacks)
  callbacks.each do |cb|
    begin
      cb.call
    rescue Exception => e
      logger.error { "Error caught in user supplied callback" }
      logger.error { e.to_std_format }
    end
  end
end

#synchronizeObject (protected)



169
170
171
172
173
# File 'lib/zk/election.rb', line 169

def synchronize
#           call_line = caller[0..-2]
#           logger.debug { "synchronizing, backtrace:\n#{call_line.join("\n")}" }
  @mutex.synchronize { yield }
end

#vote_basenameObject (protected)



150
151
152
# File 'lib/zk/election.rb', line 150

def vote_basename
  vote_path and File.basename(vote_path)
end