Class: Kazoo::CLI::Consumers

Inherits:
Thor
  • Object
show all
Includes:
Common
Defined in:
lib/kazoo/cli/consumers.rb

Instance Method Summary collapse

Methods included from Common

included

Instance Method Details

#clean_stored_offsets(name) ⇒ Object

Raises:



118
119
120
121
122
123
124
125
126
# File 'lib/kazoo/cli/consumers.rb', line 118

def clean_stored_offsets(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot clean offsets for #{cg.name} if it is not running" unless cg.active?

  cg.clean_stored_offsets
end

#clean_topic_claims(name) ⇒ Object

Raises:



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/kazoo/cli/consumers.rb', line 94

def clean_topic_claims(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot cleanup consumergroup #{cg.name} if it is not running" unless cg.active?

  subscribed_topics = cg.subscribed_topics
  claimed_topics    = cg.claimed_topics
  to_clean          = claimed_topics - subscribed_topics

  if to_clean.empty?
    puts "The consumer group does not have any lingering topic claims."
  else
    puts "The following topics were once claimed, but are no longer part of #{cg.name}'s subscriptions:"
    to_clean.each do |topic|
      puts "- #{topic.name}"
    end

    cg.clean_topic_claims
  end
end

#delete(name) ⇒ Object

Raises:



72
73
74
75
76
77
78
79
80
# File 'lib/kazoo/cli/consumers.rb', line 72

def delete(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot remove consumergroup #{cg.name} because it's still active" if cg.active?

  cg.destroy
end

#listObject



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/kazoo/cli/consumers.rb', line 7

def list
  validate_class_options!

  kafka_cluster.consumergroups.sort_by(&:name).each do |group|
    instances = group.instances
    if instances.length == 0
      puts "- #{group.name}: inactive"
    else
      puts "- #{group.name}: #{instances.length} running instances"
    end
  end
end

#reset(name) ⇒ Object

Raises:



83
84
85
86
87
88
89
90
91
# File 'lib/kazoo/cli/consumers.rb', line 83

def reset(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?
  raise Kazoo::Error, "Cannot remove consumergroup #{cg.name} because it's still active" if cg.active?

  cg.reset_all_offsets
end

#show(name) ⇒ Object

Raises:



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/kazoo/cli/consumers.rb', line 21

def show(name)
  validate_class_options!

  cg = kafka_cluster.consumergroup(name)
  raise Kazoo::Error, "Consumergroup #{cg.name} is not registered in Zookeeper" unless cg.exists?

  topics = cg.subscribed_topics.sort_by(&:name)

  puts "Consumer name: #{cg.name}"
  puts "Created on: #{cg.created_at}"
  puts "Topics (#{topics.length}): #{topics.map(&:name).join(', ')}"

  instances = cg.instances
  if instances.length > 0

    puts
    puts "Running instances (#{instances.length}):"
    instances.each do |instance|
      puts "- #{instance.id}\t(created on #{instance.created_at})"
    end

    partition_claims = cg.partition_claims
    if partition_claims.length > 0
      partitions = partition_claims.keys.sort_by { |p| [p.topic.name, p.id] }

      puts
      puts "Partition claims (#{partition_claims.length}):"
      partitions.each do |partition|
        instance = partition_claims[partition]
        puts "- #{partition.key}: #{instance.id}"
      end
    else
      puts
      puts "WARNING: this consumer group is active but hasn't claimed any partitions"
    end

    unclaimed_partitions = (cg.partitions - partition_claims.keys).sort_by { |p| [p.topic.name, p.id] }

    if unclaimed_partitions.length > 0
      puts
      puts "WARNING: this consumergroup has #{unclaimed_partitions.length} unclaimed partitions:"
      unclaimed_partitions.each do |partition|
        puts "- #{partition.key}"
      end
    end
  else
    puts "This consumer group is inactive."
  end
end