Class: Kazoo::Consumergroup::Instance

Inherits:
Object
  • Object
show all
Defined in:
lib/kazoo/consumergroup.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(group, id: nil) ⇒ Instance

Returns a new instance of Instance.



143
144
145
146
# File 'lib/kazoo/consumergroup.rb', line 143

def initialize(group, id: nil)
  @group = group
  @id = id || self.class.generate_id
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



141
142
143
# File 'lib/kazoo/consumergroup.rb', line 141

def group
  @group
end

#idObject (readonly)

Returns the value of attribute id.



141
142
143
# File 'lib/kazoo/consumergroup.rb', line 141

def id
  @id
end

Class Method Details

.generate_idObject



137
138
139
# File 'lib/kazoo/consumergroup.rb', line 137

def self.generate_id
  "#{Socket.gethostname}:#{SecureRandom.uuid}"
end

Instance Method Details

#claim_partition(partition) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/kazoo/consumergroup.rb', line 184

def claim_partition(partition)
  result = cluster.zk.create(
    path: cluster.node_with_chroot("/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}"),
    ephemeral: true,
    data: id,
  )

  case result.fetch(:rc)
  when Zookeeper::Constants::ZOK
    return true
  when Zookeeper::Constants::ZNODEEXISTS
    raise Kazoo::PartitionAlreadyClaimed, "Partition #{partition.topic.name}/#{partition.id} is already claimed!"
  else
    raise Kazoo::Error, "Failed to claim partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end

#deregisterObject



180
181
182
# File 'lib/kazoo/consumergroup.rb', line 180

def deregister
  cluster.zk.delete(path: cluster.node_with_chroot("/consumers/#{group.name}/ids/#{id}"))
end

#eql?(other) ⇒ Boolean Also known as: ==

Returns:

  • (Boolean)


216
217
218
# File 'lib/kazoo/consumergroup.rb', line 216

def eql?(other)
  other.kind_of?(Kazoo::Consumergroup::Instance) && group == other.group && id == other.id
end

#hashObject



212
213
214
# File 'lib/kazoo/consumergroup.rb', line 212

def hash
  [group, id].hash
end

#inspectObject



208
209
210
# File 'lib/kazoo/consumergroup.rb', line 208

def inspect
  "#<Kazoo::Consumergroup::Instance group=#{group.name} id=#{id}>"
end

#register(subscription) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/kazoo/consumergroup.rb', line 153

def register(subscription)
  result = cluster.zk.create(
    path: cluster.node_with_chroot("/consumers/#{group.name}/ids/#{id}"),
    ephemeral: true,
    data: JSON.generate({
      version: 1,
      timestamp: Time.now.to_i,
      pattern: "static",
      subscription: Hash[*subscription.flat_map { |topic| [topic.name, 1] } ]
    })
  )

  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register instance #{id} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
  end

  subscription.each do |topic|
    stat = cluster.zk.stat(path: cluster.node_with_chroot("/consumers/#{group.name}/owners/#{topic.name}"))
    unless stat.fetch(:stat).exists?
      result = cluster.zk.create(path: cluster.node_with_chroot("/consumers/#{group.name}/owners/#{topic.name}"))
      if result.fetch(:rc) != Zookeeper::Constants::ZOK
        raise Kazoo::ConsumerInstanceRegistrationFailed, "Failed to register subscription of #{topic.name} for consumer group #{group.name}! Error code: #{result.fetch(:rc)}"
      end
    end
  end
end

#registered?Boolean

Returns:

  • (Boolean)


148
149
150
151
# File 'lib/kazoo/consumergroup.rb', line 148

def registered?
  stat = cluster.zk.stat(path: cluster.node_with_chroot("/consumers/#{group.name}/ids/#{id}"))
  stat.fetch(:stat).exists?
end

#release_partition(partition) ⇒ Object



201
202
203
204
205
206
# File 'lib/kazoo/consumergroup.rb', line 201

def release_partition(partition)
  result = cluster.zk.delete(path: cluster.node_with_chroot("/consumers/#{group.name}/owners/#{partition.topic.name}/#{partition.id}"))
  if result.fetch(:rc) != Zookeeper::Constants::ZOK
    raise Kazoo::Error, "Failed to release partition #{partition.topic.name}/#{partition.id}. Error code: #{result.fetch(:rc)}"
  end
end