Class: Kafkat::ClusterRestart::Session
- Inherits:
-
Object
- Object
- Kafkat::ClusterRestart::Session
show all
- Defined in:
- lib/kafkat/command/cluster_restart.rb
Defined Under Namespace
Classes: NotFoundError, ParseError
Constant Summary
collapse
- SESSION_PATH =
'~/kafkat_cluster_restart_session.json'
- STATE_RESTARTED =
use String instead of Symbol to facilitate JSON ser/deser
'restarted'
- STATE_NOT_RESTARTED =
'not_restarted'
- STATE_PENDING =
'pending'
- STATES =
[STATE_NOT_RESTARTED, STATE_RESTARTED, STATE_PENDING]
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(data = {}) ⇒ Session
Returns a new instance of Session.
270
271
272
|
# File 'lib/kafkat/command/cluster_restart.rb', line 270
def initialize(data = {})
@broker_states = data['broker_states'] || {}
end
|
Instance Attribute Details
#broker_states ⇒ Object
Returns the value of attribute broker_states.
236
237
238
|
# File 'lib/kafkat/command/cluster_restart.rb', line 236
def broker_states
@broker_states
end
|
Class Method Details
.exists? ⇒ Boolean
238
239
240
|
# File 'lib/kafkat/command/cluster_restart.rb', line 238
def self.exists?
File.file?(File.expand_path(SESSION_PATH))
end
|
.from_brokers(brokers) ⇒ Object
265
266
267
268
|
# File 'lib/kafkat/command/cluster_restart.rb', line 265
def self.from_brokers(brokers)
states = brokers.each_with_object({}) { |id, h| h[id] = STATE_NOT_RESTARTED }
Session.new('broker_states' => states)
end
|
.from_zookeepers(zookeeper) ⇒ Object
260
261
262
263
|
# File 'lib/kafkat/command/cluster_restart.rb', line 260
def self.from_zookeepers(zookeeper)
broker_ids = zookeeper.get_broker_ids
Session.from_brokers(broker_ids)
end
|
.load!(session_file = SESSION_PATH) ⇒ Object
242
243
244
245
246
247
248
249
250
251
252
253
|
# File 'lib/kafkat/command/cluster_restart.rb', line 242
def self.load!(session_file = SESSION_PATH)
path = File.expand_path(session_file)
string = File.read(path)
json = JSON.parse(string)
self.new(json)
rescue Errno::ENOENT
raise NotFoundError
rescue JSON::JSONError
raise ParseError
end
|
.reset!(session_file = SESSION_PATH) ⇒ Object
255
256
257
258
|
# File 'lib/kafkat/command/cluster_restart.rb', line 255
def self.reset!(session_file = SESSION_PATH)
path = File.expand_path(session_file)
File.delete(path)
end
|
Instance Method Details
#all_restarted? ⇒ Boolean
319
320
321
|
# File 'lib/kafkat/command/cluster_restart.rb', line 319
def all_restarted?
@broker_states.values.all? { |state| state == STATE_RESTARTED }
end
|
#not_restarted?(broker_id) ⇒ Boolean
311
312
313
|
# File 'lib/kafkat/command/cluster_restart.rb', line 311
def not_restarted?(broker_id)
state?(broker_id, STATE_NOT_RESTARTED)
end
|
#pending?(broker_id) ⇒ Boolean
307
308
309
|
# File 'lib/kafkat/command/cluster_restart.rb', line 307
def pending?(broker_id)
state?(broker_id, STATE_PENDING)
end
|
#pending_brokers ⇒ Object
323
324
325
326
327
|
# File 'lib/kafkat/command/cluster_restart.rb', line 323
def pending_brokers
broker_states.keys.find_all do |broker_id|
broker_states[broker_id] == STATE_PENDING
end
end
|
#restarted?(broker_id) ⇒ Boolean
315
316
317
|
# File 'lib/kafkat/command/cluster_restart.rb', line 315
def restarted?(broker_id)
state?(broker_id, STATE_RESTARTED)
end
|
#save!(session_file = SESSION_PATH) ⇒ Object
274
275
276
277
278
|
# File 'lib/kafkat/command/cluster_restart.rb', line 274
def save!(session_file = SESSION_PATH)
File.open(File.expand_path(session_file), 'w') do |f|
f.puts JSON.pretty_generate(self.to_h)
end
end
|
#state(broker_id) ⇒ Object
296
297
298
299
|
# File 'lib/kafkat/command/cluster_restart.rb', line 296
def state(broker_id)
raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id)
broker_states[broker_id]
end
|
#state?(broker_id, state) ⇒ Boolean
301
302
303
304
305
|
# File 'lib/kafkat/command/cluster_restart.rb', line 301
def state?(broker_id, state)
raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id)
raise UnknownStateError, "Unknown state: #{state}" unless STATES.include?(state)
@broker_states[broker_id] == state
end
|
#to_h ⇒ Object
329
330
331
332
333
|
# File 'lib/kafkat/command/cluster_restart.rb', line 329
def to_h
{
:broker_states => broker_states,
}
end
|
#update_states!(state, ids) ⇒ Object
280
281
282
283
284
285
286
287
288
289
290
291
292
293
|
# File 'lib/kafkat/command/cluster_restart.rb', line 280
def update_states!(state, ids)
state = state.to_s if state.is_a?(Symbol)
unless STATES.include?(state)
raise UnknownStateError, "Unknown State #{state}"
end
intersection = ids & broker_states.keys
unless intersection == ids
raise UnknownBrokerError, "Unknown brokers: #{(ids - intersection).join(', ')}"
end
ids.each { |id| broker_states[id] = state }
self
end
|