Class: Poseidon::ConsumerGroup
- Inherits:
-
Object
- Object
- Poseidon::ConsumerGroup
- Defined in:
- lib/poseidon/consumer_group.rb
Overview
A ConsumerGroup operates on all partitions of a single topic. The goal is to ensure each topic message is consumed only once, no matter of the number of consumer instances within a cluster, as described in: kafka.apache.org/documentation.html#distributionimpl.
The ConsumerGroup internally creates multiple PartitionConsumer instances. It uses Zookkeper and follows a simple consumer rebalancing algorithms which allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Each ConsumerGroup can ‘claim’ 0-n partitions and will consume their messages until another ConsumerGroup instance joins or leaves the cluster.
Please note: ConsumerGroups themselves don’t implement any threading or concurrency. When consuming messages, they simply round-robin across the claimed partitions. If you wish to parallelize consumption simply create multiple ConsumerGroups instances. The built-in concensus algorithm will automatically rebalance the available partitions between them and you can then decide for yourself if you want to run them in multiple thread or processes, ideally on multiple boxes.
Unlike stated in the Kafka documentation, consumer rebalancing is only triggered on each addition or removal of consumers within the same group, while the addition of broker nodes and/or partition *does currently not trigger* a rebalancing cycle.
Defined Under Namespace
Classes: Consumer
Constant Summary collapse
- DEFAULT_CLAIM_TIMEOUT =
30
- DEFAULT_LOOP_DELAY =
1
Instance Attribute Summary collapse
- #name ⇒ Object readonly
- #options ⇒ Object readonly
- #pool ⇒ Object readonly
- #topic ⇒ Object readonly
- #zk ⇒ Object readonly
Class Method Summary collapse
-
.pick(pnum, cids, id) ⇒ Range, NilClass
Selectable range, if any.
Instance Method Summary collapse
-
#checkout(opts = {}) {|consumer| ... } ⇒ Boolean
Checks out a single partition consumer.
-
#claimed ⇒ Array<Integer>
Partitions currently claimed and consumed by this group instance.
-
#close ⇒ Object
private
Closes the consumer group gracefully, only really useful in tests.
-
#commit(partition, offset) ⇒ Object
Commits the latest offset for a partition.
-
#fetch(opts = {}) {|partition, messages| ... } ⇒ Boolean
Convenience method to fetch messages from the broker.
-
#fetch_loop(opts = {}) {|partition, messages| ... } ⇒ Object
Initializes an infinite fetch loop.
-
#id ⇒ String
A globally unique identifier.
-
#initialize(name, brokers, zookeepers, topic, options = {}) ⇒ ConsumerGroup
constructor
Create a new consumer group, which processes all partition of the specified topic.
-
#leader(partition) ⇒ Poseidon::Protocol::Broker
The leader for the given partition.
-
#metadata ⇒ Poseidon::ClusterMetadata
Cluster metadata.
-
#offset(partition) ⇒ Integer
The latest stored offset for the given partition.
-
#partitions ⇒ Array<Poseidon::Protocol::PartitionMetadata>
Sorted partitions by broker address (so partitions on the same broker are clustered together).
-
#register! ⇒ Boolean
True if registration was successful, false if already registered.
-
#registered? ⇒ Boolean
True if registered.
-
#registries ⇒ Hash<Symbol,String>
Registry paths.
-
#reload ⇒ Object
Reloads metadata/broker/partition information.
-
#topic_metadata ⇒ Poseidon::TopicMetadata
Topic metadata.
Constructor Details
#initialize(name, brokers, zookeepers, topic, options = {}) ⇒ ConsumerGroup
Create a new consumer group, which processes all partition of the specified topic.
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/poseidon/consumer_group.rb', line 97 def initialize(name, brokers, zookeepers, topic, = {}) @name = name @topic = topic @zk = ::ZK.new(zookeepers.join(",")) # Poseidon::BrokerPool doesn't provide default value for this option # Configuring default value like this isn't beautiful, though.. by kssminus [:socket_timeout_ms] ||= 10000 @options = @consumers = [] @pool = ::Poseidon::BrokerPool.new(id, brokers, [:socket_timeout_ms]) @mutex = Mutex.new @registered = false register! unless .delete(:register) == false end |
Instance Attribute Details
#name ⇒ Object (readonly)
66 67 68 |
# File 'lib/poseidon/consumer_group.rb', line 66 def name @name end |
#options ⇒ Object (readonly)
78 79 80 |
# File 'lib/poseidon/consumer_group.rb', line 78 def @options end |
#pool ⇒ Object (readonly)
72 73 74 |
# File 'lib/poseidon/consumer_group.rb', line 72 def pool @pool end |
#topic ⇒ Object (readonly)
69 70 71 |
# File 'lib/poseidon/consumer_group.rb', line 69 def topic @topic end |
#zk ⇒ Object (readonly)
75 76 77 |
# File 'lib/poseidon/consumer_group.rb', line 75 def zk @zk end |
Class Method Details
.pick(pnum, cids, id) ⇒ Range, NilClass
Returns selectable range, if any.
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/poseidon/consumer_group.rb', line 51 def self.pick(pnum, cids, id) cids = cids.sort pos = cids.index(id) return unless pos && pos < cids.size step = pnum.fdiv(cids.size).ceil frst = pos*step last = (pos+1)*step-1 last = pnum-1 if last > pnum-1 return if last < 0 || last < frst (frst..last) end |
Instance Method Details
#checkout(opts = {}) {|consumer| ... } ⇒ Boolean
Checks out a single partition consumer. Round-robins between claimed partitions.
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/poseidon/consumer_group.rb', line 229 def checkout(opts = {}) consumer = nil commit = @mutex.synchronize do consumer = @consumers.shift return false unless consumer @consumers.push consumer yield consumer end unless opts[:commit] == false || commit == false commit consumer.partition, consumer.offset end true end |
#claimed ⇒ Array<Integer>
Partitions currently claimed and consumed by this group instance
207 208 209 |
# File 'lib/poseidon/consumer_group.rb', line 207 def claimed @consumers.map(&:partition).sort end |
#close ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Closes the consumer group gracefully, only really useful in tests
167 168 169 170 |
# File 'lib/poseidon/consumer_group.rb', line 167 def close @mutex.synchronize { release_all! } zk.close end |
#commit(partition, offset) ⇒ Object
Commits the latest offset for a partition
188 189 190 191 192 |
# File 'lib/poseidon/consumer_group.rb', line 188 def commit(partition, offset) zk.set offset_path(partition), offset.to_s rescue ZK::Exceptions::NoNode zk.create offset_path(partition), offset.to_s, ignore: :node_exists end |
#fetch(opts = {}) {|partition, messages| ... } ⇒ Boolean
Convenience method to fetch messages from the broker. Round-robins between claimed partitions.
265 266 267 268 269 |
# File 'lib/poseidon/consumer_group.rb', line 265 def fetch(opts = {}) checkout(opts) do |consumer| yield consumer.partition, consumer.fetch end end |
#fetch_loop(opts = {}) {|partition, messages| ... } ⇒ Object
Initializes an infinite fetch loop. This method blocks!
Will wait for ‘loop_delay` seconds after each failed fetch. This may happen when there is no new data or when the consumer hasn’t claimed any partitions.
SPECIAL ATTENTION: When ‘breaking out’ of the loop, you must do it before processing the messages, as the the last offset will not be committed. Please see examples below.
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/poseidon/consumer_group.rb', line 335 def fetch_loop(opts = {}) delay = opts[:loop_delay] || [:loop_delay] || DEFAULT_LOOP_DELAY loop do mp = false ok = fetch(opts) do |n, | mp = !.empty? yield n, end # Yield over an empty array if nothing claimed, # to allow user to e.g. break out of the loop unless ok yield -1, [] end # Sleep if either not claimes or nothing returned unless ok && mp sleep delay end end end |
#id ⇒ String
Returns a globally unique identifier.
114 115 116 |
# File 'lib/poseidon/consumer_group.rb', line 114 def id @id ||= [name, Poseidon::Cluster.guid].join("-") end |
#leader(partition) ⇒ Poseidon::Protocol::Broker
Returns the leader for the given partition.
174 175 176 |
# File 'lib/poseidon/consumer_group.rb', line 174 def leader(partition) .lead_broker_for_partition(topic, partition) end |
#metadata ⇒ Poseidon::ClusterMetadata
Returns cluster metadata.
128 129 130 |
# File 'lib/poseidon/consumer_group.rb', line 128 def @metadata ||= Poseidon::ClusterMetadata.new.tap {|m| m.update pool.([topic]) } end |
#offset(partition) ⇒ Integer
Returns the latest stored offset for the given partition.
180 181 182 183 |
# File 'lib/poseidon/consumer_group.rb', line 180 def offset(partition) data, _ = zk.get offset_path(partition), ignore: :no_node data.to_i end |
#partitions ⇒ Array<Poseidon::Protocol::PartitionMetadata>
Sorted partitions by broker address (so partitions on the same broker are clustered together)
196 197 198 199 200 201 202 203 |
# File 'lib/poseidon/consumer_group.rb', line 196 def partitions return [] unless .available_partitions.sort_by do |part| broker = .brokers[part.leader] [broker.host, broker.port].join(":") end end |
#register! ⇒ Boolean
Returns true if registration was successful, false if already registered.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/poseidon/consumer_group.rb', line 143 def register! return false if registered? # Register instance registries.each do |_, path| zk.mkdir_p(path) end zk.create(consumer_path, "{}", ephemeral: true) zk.register(registries[:consumer]) {|_| rebalance! } # Rebalance rebalance! @registered = true end |
#registered? ⇒ Boolean
Returns true if registered.
138 139 140 |
# File 'lib/poseidon/consumer_group.rb', line 138 def registered? @registered end |
#registries ⇒ Hash<Symbol,String>
Returns registry paths.
119 120 121 122 123 124 125 |
# File 'lib/poseidon/consumer_group.rb', line 119 def registries @registries ||= { consumer: "/consumers/#{name}/ids", owner: "/consumers/#{name}/owners/#{topic}", offset: "/consumers/#{name}/offsets/#{topic}", } end |
#reload ⇒ Object
Reloads metadata/broker/partition information
159 160 161 162 163 |
# File 'lib/poseidon/consumer_group.rb', line 159 def reload @metadata = @topic_metadata = nil self end |
#topic_metadata ⇒ Poseidon::TopicMetadata
Returns topic metadata.
133 134 135 |
# File 'lib/poseidon/consumer_group.rb', line 133 def @topic_metadata ||= .([topic])[topic] end |