Class: Kafkat::Interface::Admin

Inherits:
Object
  • Object
show all
Defined in:
lib/kafkat/interface/admin.rb

Defined Under Namespace

Classes: ExecutionFailedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Admin

Returns a new instance of Admin.



12
13
14
15
16
# File 'lib/kafkat/interface/admin.rb', line 12

def initialize(config)
  @kafka_path = config.kafka_path
  @zk_path = config.zk_path
  @json_files_path = config.json_files_path
end

Instance Attribute Details

#json_files_pathObject (readonly)

Returns the value of attribute json_files_path.



10
11
12
# File 'lib/kafkat/interface/admin.rb', line 10

def json_files_path
  @json_files_path
end

#kafka_pathObject (readonly)

Returns the value of attribute kafka_path.



8
9
10
# File 'lib/kafkat/interface/admin.rb', line 8

def kafka_path
  @kafka_path
end

#zk_pathObject (readonly)

Returns the value of attribute zk_path.



9
10
11
# File 'lib/kafkat/interface/admin.rb', line 9

def zk_path
  @zk_path
end

Instance Method Details

#elect_leaders!(partitions) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/kafkat/interface/admin.rb', line 18

def elect_leaders!(partitions)
  file = File.new File.join(@json_files_path, "elect-leaders_#{Time.now.xmlschema}.json"), "w"

  json_partitions = []
  partitions.each do |p|
    json_partitions << {
      'topic' => p.topic_name,
      'partition' => p.id
    }
  end

  json = {'partitions' => json_partitions}
  file.write(JSON.dump(json))
  file.close

  puts "Using JSON file: " + file.path

  run_tool(
    'kafka-preferred-replica-election',
      '--path-to-json-file', file.path
  )
end

#reassign!(assignments) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/kafkat/interface/admin.rb', line 41

def reassign!(assignments)
  file_name = "reassign_#{Time.now.xmlschema}.json"
  file = File.new File.join(@json_files_path, file_name), "w"

  json_partitions = []
  assignments.each do |a|
    json_partitions << {
      'topic' => a.topic_name,
      'partition' => a.partition_id,
      'replicas' => a.replicas
    }
  end

  json = {
    'partitions' => json_partitions,
    'version' => 1
  }

  file.write(JSON.dump(json))
  file.close

  puts "Using JSON file: " + file.path
  puts "Run this command to check the status: kafkat verify-reassign #{file_name}"

  run_tool(
    'kafka-reassign-partitions',
      '--execute',
      '--reassignment-json-file', file.path
  )
end

#run_tool(name, *args) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/kafkat/interface/admin.rb', line 101

def run_tool(name, *args)
  path = File.join(kafka_path, "bin/#{name}.sh")
  # The scripts in the Confluent package does not have .sh extensions
  if !File.exist? path
    path = File.join(kafka_path, "bin/#{name}")
  end
  args += ['--zookeeper', "\"#{zk_path}\""]
  args_string = args.join(' ')
  result = `#{path} #{args_string}`
  raise ExecutionFailedError if $?.to_i > 0
  result
end

#shutdown!(broker_id, options = {}) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/kafkat/interface/admin.rb', line 89

def shutdown!(broker_id, options={})
  args = ['--broker', broker_id]
  args += ['--num.retries', options[:retries]] if options[:retries]
  args += ['--retry.interval.ms', option[:interval]] if options[:interval]

  run_tool(
    'kafka-run-class',
      'kafka.admin.ShutdownBroker',
      *args
  )
end

#verify_reassign(file_name) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/kafkat/interface/admin.rb', line 72

def verify_reassign(file_name)
  file =
    if File.exist? file_name
      File.new file_name
    else
      File.new File.join(@json_files_path, file_name)
    end

  puts "Using JSON file: " + file.path

  run_tool(
    'kafka-reassign-partitions',
      '--verify',
      '--reassignment-json-file', file.path
  )
end