Class: Kafkat::ClusterRestart::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkat/command/cluster_restart.rb

Defined Under Namespace

Classes: NotFoundError, ParseError

Constant Summary collapse

SESSION_PATH =
'~/kafkat_cluster_restart_session.json'
STATE_RESTARTED =

use String instead of Symbol to facilitate JSON ser/deser

'restarted'
STATE_NOT_RESTARTED =
'not_restarted'
STATE_PENDING =
'pending'
STATES =
[STATE_NOT_RESTARTED, STATE_RESTARTED, STATE_PENDING]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data = {}) ⇒ Session

Returns a new instance of Session.



270
271
272
# File 'lib/kafkat/command/cluster_restart.rb', line 270

def initialize(data = {})
  @broker_states = data['broker_states'] || {}
end

Instance Attribute Details

#broker_statesObject (readonly)

Returns the value of attribute broker_states.



236
237
238
# File 'lib/kafkat/command/cluster_restart.rb', line 236

def broker_states
  @broker_states
end

Class Method Details

.exists?Boolean

Returns:

  • (Boolean)


238
239
240
# File 'lib/kafkat/command/cluster_restart.rb', line 238

def self.exists?
  File.file?(File.expand_path(SESSION_PATH))
end

.from_brokers(brokers) ⇒ Object



265
266
267
268
# File 'lib/kafkat/command/cluster_restart.rb', line 265

def self.from_brokers(brokers)
  states = brokers.each_with_object({}) { |id, h| h[id] = STATE_NOT_RESTARTED }
  Session.new('broker_states' => states)
end

.from_zookeepers(zookeeper) ⇒ Object



260
261
262
263
# File 'lib/kafkat/command/cluster_restart.rb', line 260

def self.from_zookeepers(zookeeper)
  broker_ids = zookeeper.get_broker_ids
  Session.from_brokers(broker_ids)
end

.load!(session_file = SESSION_PATH) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/kafkat/command/cluster_restart.rb', line 242

def self.load!(session_file = SESSION_PATH)
  path = File.expand_path(session_file)
  string = File.read(path)

  json = JSON.parse(string)
  self.new(json)

rescue Errno::ENOENT
  raise NotFoundError
rescue JSON::JSONError
  raise ParseError
end

.reset!(session_file = SESSION_PATH) ⇒ Object



255
256
257
258
# File 'lib/kafkat/command/cluster_restart.rb', line 255

def self.reset!(session_file = SESSION_PATH)
  path = File.expand_path(session_file)
  File.delete(path)
end

Instance Method Details

#all_restarted?Boolean

Returns:

  • (Boolean)


319
320
321
# File 'lib/kafkat/command/cluster_restart.rb', line 319

def all_restarted?
  @broker_states.values.all? { |state| state == STATE_RESTARTED }
end

#not_restarted?(broker_id) ⇒ Boolean

Returns:

  • (Boolean)


311
312
313
# File 'lib/kafkat/command/cluster_restart.rb', line 311

def not_restarted?(broker_id)
  state?(broker_id, STATE_NOT_RESTARTED)
end

#pending?(broker_id) ⇒ Boolean

Returns:

  • (Boolean)


307
308
309
# File 'lib/kafkat/command/cluster_restart.rb', line 307

def pending?(broker_id)
  state?(broker_id, STATE_PENDING)
end

#pending_brokersObject



323
324
325
326
327
# File 'lib/kafkat/command/cluster_restart.rb', line 323

def pending_brokers
  broker_states.keys.find_all do |broker_id|
    broker_states[broker_id] == STATE_PENDING
  end
end

#restarted?(broker_id) ⇒ Boolean

Returns:

  • (Boolean)


315
316
317
# File 'lib/kafkat/command/cluster_restart.rb', line 315

def restarted?(broker_id)
  state?(broker_id, STATE_RESTARTED)
end

#save!(session_file = SESSION_PATH) ⇒ Object



274
275
276
277
278
# File 'lib/kafkat/command/cluster_restart.rb', line 274

def save!(session_file = SESSION_PATH)
  File.open(File.expand_path(session_file), 'w') do |f|
    f.puts JSON.pretty_generate(self.to_h)
  end
end

#state(broker_id) ⇒ Object

Raises:



296
297
298
299
# File 'lib/kafkat/command/cluster_restart.rb', line 296

def state(broker_id)
  raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id)
  broker_states[broker_id]
end

#state?(broker_id, state) ⇒ Boolean

Returns:

  • (Boolean)

Raises:



301
302
303
304
305
# File 'lib/kafkat/command/cluster_restart.rb', line 301

def state?(broker_id, state)
  raise UnknownBrokerError, "Unknown broker: #{broker_id}" unless @broker_states.key?(broker_id)
  raise UnknownStateError, "Unknown state: #{state}" unless STATES.include?(state)
  @broker_states[broker_id] == state
end

#to_hObject



329
330
331
332
333
# File 'lib/kafkat/command/cluster_restart.rb', line 329

def to_h
  {
      :broker_states => broker_states,
  }
end

#update_states!(state, ids) ⇒ Object



280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/kafkat/command/cluster_restart.rb', line 280

def update_states!(state, ids)
  state = state.to_s if state.is_a?(Symbol)
  unless STATES.include?(state)
    raise UnknownStateError, "Unknown State #{state}"
  end

  intersection = ids & broker_states.keys
  unless intersection == ids
    raise UnknownBrokerError, "Unknown brokers: #{(ids - intersection).join(', ')}"
  end

  ids.each { |id| broker_states[id] = state }
  self
end