Class: Kazoo::Consumergroup

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/consumergroup.rb

Defined Under Namespace

Classes: Instance

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, name) ⇒ Consumergroup

Returns a new instance of Consumergroup.



5
6
7
# File 'lib/kazoo/consumergroup.rb', line 5

def initialize(cluster, name)
  @cluster, @name = cluster, name
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



3
4
5
# File 'lib/kazoo/consumergroup.rb', line 3

def cluster
  @cluster
end

#nameObject (readonly)

Returns the value of attribute name.



3
4
5
# File 'lib/kazoo/consumergroup.rb', line 3

def name
  @name
end

Instance Method Details

#commit_offset(partition, offset) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/kazoo/consumergroup.rb', line 71

def commit_offset(partition, offset)
  result = cluster.zk.set(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}", data: (offset + 1).to_s)
  if result.fetch(:rc) == Zookeeper::Constants::ZNONODE
    result = cluster.zk.create(path: "/consumers/#{name}/offsets/#{partition.topic.name}")
    case result.fetch(:rc)
      when Zookeeper::Constants::ZOK, Zookeeper::Constants::ZNODEEXISTS
    else
      raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
    end

    result = cluster.zk.create(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}", data: (offset + 1).to_s)
  end

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to commit offset #{offset} for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

#createObject



9
10
11
12
13
14
# File 'lib/kazoo/consumergroup.rb', line 9

def create
  cluster.zk.create(path: "/consumers/#{name}")
  cluster.zk.create(path: "/consumers/#{name}/ids")
  cluster.zk.create(path: "/consumers/#{name}/owners")
  cluster.zk.create(path: "/consumers/#{name}/offsets")
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


111
112
113
# File 'lib/kazoo/consumergroup.rb', line 111

def eql?(other)
  other.kind_of?(Kazoo::Consumergroup) && cluster == other.cluster && name == other.name
end

#exists?Boolean

Returns:

  • (Boolean)


16
17
18
19
# File 'lib/kazoo/consumergroup.rb', line 16

def exists?
  stat = cluster.zk.stat(path: "/consumers/#{name}")
  stat.fetch(:stat).exists?
end

#hashObject



117
118
119
# File 'lib/kazoo/consumergroup.rb', line 117

def hash
  [cluster, name].hash
end

#inspectObject



107
108
109
# File 'lib/kazoo/consumergroup.rb', line 107

def inspect
  "#<Kazoo::Consumergroup name=#{name}>"
end

#instancesObject



26
27
28
29
# File 'lib/kazoo/consumergroup.rb', line 26

def instances
  instances = cluster.zk.get_children(path: "/consumers/#{name}/ids")
  instances.fetch(:children).map { |id| Instance.new(self, id: id) }
end

#instantiate(id: nil) ⇒ Object



22
23
24
# File 'lib/kazoo/consumergroup.rb', line 22

def instantiate(id: nil)
  Instance.new(self, id: id)
end

#reset_offsetsObject

Raises:



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/kazoo/consumergroup.rb', line 89

def reset_offsets
  result = cluster.zk.get_children(path: "/consumers/#{name}/offsets")
  raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK

  result.fetch(:children).each do |topic|
    result = cluster.zk.get_children(path: "/consumers/#{name}/offsets/#{topic}")
    raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK

    result.fetch(:children).each do |partition|
      cluster.zk.delete(path: "/consumers/#{name}/offsets/#{topic}/#{partition}")
      raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK
    end

    cluster.zk.delete(path: "/consumers/#{name}/offsets/#{topic}")
    raise Kazoo::Error unless result.fetch(:rc) == Zookeeper::Constants::ZOK
  end
end

#retrieve_offset(partition) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
# File 'lib/kazoo/consumergroup.rb', line 59

def retrieve_offset(partition)
  result = cluster.zk.get(path: "/consumers/#{name}/offsets/#{partition.topic.name}/#{partition.id}")
  case result.fetch(:rc)
    when Zookeeper::Constants::ZOK;
      result.fetch(:data).to_i
    when Zookeeper::Constants::ZNONODE;
      nil
    else
      raise Kazoo::Error, "Failed to retrieve offset for partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

#watch_instances(&block) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/kazoo/consumergroup.rb', line 31

def watch_instances(&block)
  cb = Zookeeper::Callbacks::WatcherCallback.create(&block)
  result = cluster.zk.get_children(path: "/consumers/#{name}/ids", watcher: cb)

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to watch instances. Error code: #{result.fetch(:rc)}"
  end

  instances = result.fetch(:children).map { |id| Instance.new(self, id: id) }
  [instances, cb]
end

#watch_partition_claim(partition, &block) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/kazoo/consumergroup.rb', line 44

def watch_partition_claim(partition, &block)
  cb = Zookeeper::Callbacks::WatcherCallback.create(&block)

  result = cluster.zk.get(path: "/consumers/#{name}/owners/#{partition.topic.name}/#{partition.id}", watcher: cb)

  case result.fetch(:rc)
  when Zookeeper::Constants::ZNONODE # Nobody is claiming this partition yet
    [nil, nil]
  when Zookeeper::Constants::ZOK
    [Kazoo::Consumergroup::Instance.new(self, id: result.fetch(:data)), cb]
  else
    raise Kazoo::Error, "Failed set watch for partition claim of #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end