Class: Karafka::Connection::Client
- Inherits:
-
Object
- Object
- Karafka::Connection::Client
- Defined in:
- lib/karafka/connection/client.rb
Overview
An abstraction layer on top of the rdkafka consumer.
It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
Id of the client.
-
#name ⇒ String
readonly
Underlying consumer name.
-
#rebalance_manager ⇒ Object
readonly
Returns the value of attribute rebalance_manager.
Instance Method Summary collapse
-
#assignment_lost? ⇒ Boolean
True if our current assignment has been lost involuntarily.
-
#batch_poll ⇒ Karafka::Connection::MessagesBuffer
Fetches messages within boundaries defined by the settings (time, size, topics, etc).
-
#commit_offsets(async: true) ⇒ Boolean
Commits the offset on a current consumer in a non-blocking or blocking way.
-
#commit_offsets! ⇒ Object
Commits offset in a synchronous way.
-
#initialize(subscription_group) ⇒ Karafka::Connection::Client
constructor
Creates a new consumer instance.
-
#mark_as_consumed(message) ⇒ Boolean
Marks given message as consumed.
-
#mark_as_consumed!(message) ⇒ Boolean
Marks a given message as consumed and commits the offsets in a blocking way.
-
#pause(topic, partition, offset) ⇒ Object
Pauses given partition and moves back to last successful offset processed.
-
#ping ⇒ Object
Runs a single poll ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective.
-
#reset ⇒ Object
Closes and resets the client completely.
-
#resume(topic, partition) ⇒ Object
Resumes processing of a give topic partition after it was paused.
-
#seek(message) ⇒ Object
Seek to a particular message.
-
#stop ⇒ Object
Gracefully stops topic consumption.
-
#store_offset(message) ⇒ Object
Stores offset for a given partition of a given topic based on the provided message.
Constructor Details
#initialize(subscription_group) ⇒ Karafka::Connection::Client
Creates a new consumer instance.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/karafka/connection/client.rb', line 39 def initialize(subscription_group) @id = SecureRandom.hex(6) # Name is set when we build consumer @name = '' @closed = false @subscription_group = subscription_group @buffer = RawMessagesBuffer.new @rebalance_manager = RebalanceManager.new @kafka = build_consumer # There are few operations that can happen in parallel from the listener threads as well # as from the workers. They are not fully thread-safe because they may be composed out of # few calls to Kafka or out of few internal state changes. That is why we mutex them. # It mostly revolves around pausing and resuming. @mutex = Mutex.new # We need to keep track of what we have paused for resuming # In case we loose partition, we still need to resume it, otherwise it won't be fetched # again if we get reassigned to it later on. We need to keep them as after revocation we # no longer may be able to fetch them from Kafka. We could build them but it is easier # to just keep them here and use if needed when cannot be obtained @paused_tpls = Hash.new { |h, k| h[k] = {} } end |
Instance Attribute Details
#id ⇒ String (readonly)
Returns id of the client.
18 19 20 |
# File 'lib/karafka/connection/client.rb', line 18 def id @id end |
#name ⇒ String (readonly)
Consumer name may change in case we regenerate it
Returns underlying consumer name.
15 16 17 |
# File 'lib/karafka/connection/client.rb', line 15 def name @name end |
#rebalance_manager ⇒ Object (readonly)
Returns the value of attribute rebalance_manager.
11 12 13 |
# File 'lib/karafka/connection/client.rb', line 11 def rebalance_manager @rebalance_manager end |
Instance Method Details
#assignment_lost? ⇒ Boolean
Returns true if our current assignment has been lost involuntarily.
117 118 119 |
# File 'lib/karafka/connection/client.rb', line 117 def assignment_lost? @kafka.assignment_lost? end |
#batch_poll ⇒ Karafka::Connection::MessagesBuffer
This method should not be executed from many threads at the same time
Fetches messages within boundaries defined by the settings (time, size, topics, etc).
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/karafka/connection/client.rb', line 66 def batch_poll time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time) @buffer.clear @rebalance_manager.clear loop do time_poll.start # Don't fetch more messages if we do not have any time left break if time_poll.exceeded? # Don't fetch more messages if we've fetched max as we've wanted break if @buffer.size >= @subscription_group. # Fetch message within our time boundaries = poll(time_poll.remaining) # Put a message to the buffer if there is one @buffer << if # Upon polling rebalance manager might have been updated. # If partition revocation happens, we need to remove messages from revoked partitions # as well as ensure we do not have duplicated due to the offset reset for partitions # that we got assigned # We also do early break, so the information about rebalance is used as soon as possible if @rebalance_manager.changed? break end # Track time spent on all of the processing and polling time_poll.checkpoint # Finally once we've (potentially) removed revoked, etc, if no messages were returned # we can break. # Worth keeping in mind, that the rebalance manager might have been updated despite no # messages being returned during a poll break unless end @buffer end |
#commit_offsets(async: true) ⇒ Boolean
This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want them to be flushed
This method for async may return ‘true` despite involuntary partition revocation as it does not resolve to `lost_assignment?`. It returns only the commit state operation result.
Commits the offset on a current consumer in a non-blocking or blocking way.
134 135 136 |
# File 'lib/karafka/connection/client.rb', line 134 def commit_offsets(async: true) internal_commit_offsets(async: async) end |
#commit_offsets! ⇒ Object
Commits offset in a synchronous way.
141 142 143 |
# File 'lib/karafka/connection/client.rb', line 141 def commit_offsets! commit_offsets(async: false) end |
#mark_as_consumed(message) ⇒ Boolean
This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed. It will however check the ‘librdkafka` assignment ownership to increase accuracy for involuntary revocations.
Marks given message as consumed.
271 272 273 |
# File 'lib/karafka/connection/client.rb', line 271 def mark_as_consumed() store_offset() && !assignment_lost? end |
#mark_as_consumed!(message) ⇒ Boolean
Marks a given message as consumed and commits the offsets in a blocking way.
279 280 281 282 283 |
# File 'lib/karafka/connection/client.rb', line 279 def mark_as_consumed!() return false unless mark_as_consumed() commit_offsets! end |
#pause(topic, partition, offset) ⇒ Object
This will pause indefinitely and requires manual ‘#resume`
Pauses given partition and moves back to last successful offset processed.
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/karafka/connection/client.rb', line 162 def pause(topic, partition, offset) @mutex.synchronize do # Do not pause if the client got closed, would not change anything return if @closed pause_msg = Messages::Seek.new(topic, partition, offset) internal_commit_offsets(async: true) # Here we do not use our cached tpls because we should not try to pause something we do # not own anymore. tpl = topic_partition_list(topic, partition) return unless tpl Karafka.monitor.instrument( 'client.pause', caller: self, subscription_group: @subscription_group, topic: topic, partition: partition, offset: offset ) @paused_tpls[topic][partition] = tpl @kafka.pause(tpl) internal_seek(pause_msg) end end |
#ping ⇒ Object
Runs a single poll ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective
This is used only to trigger rebalance callbacks
299 300 301 302 303 |
# File 'lib/karafka/connection/client.rb', line 299 def ping poll(100) rescue Rdkafka::RdkafkaError nil end |
#reset ⇒ Object
Closes and resets the client completely.
286 287 288 289 290 291 292 |
# File 'lib/karafka/connection/client.rb', line 286 def reset close @closed = false @paused_tpls.clear @kafka = build_consumer end |
#resume(topic, partition) ⇒ Object
Resumes processing of a give topic partition after it was paused.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/karafka/connection/client.rb', line 197 def resume(topic, partition) @mutex.synchronize do return if @closed # We now commit offsets on rebalances, thus we can do it async just to make sure internal_commit_offsets(async: true) # If we were not able, let's try to reuse the one we have (if we have) tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition] return unless tpl # If we did not have it, it means we never paused this partition, thus no resume should # happen in the first place return unless @paused_tpls[topic].delete(partition) Karafka.monitor.instrument( 'client.resume', caller: self, subscription_group: @subscription_group, topic: topic, partition: partition ) @kafka.resume(tpl) end end |
#seek(message) ⇒ Object
Please note, that if you are seeking to a time offset, getting the offset is blocking
Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.
151 152 153 |
# File 'lib/karafka/connection/client.rb', line 151 def seek() @mutex.synchronize { internal_seek() } end |
#stop ⇒ Object
Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages
Gracefully stops topic consumption.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/karafka/connection/client.rb', line 230 def stop # This ensures, that we do not stop the underlying client until it passes the first # rebalance for cooperative-sticky. Otherwise librdkafka may crash # # We set a timeout just in case the rebalance would never happen or would last for an # extensive time period. # # @see https://github.com/confluentinc/librdkafka/issues/4312 if @subscription_group.kafka[:'partition.assignment.strategy'] == 'cooperative-sticky' active_wait = false (COOPERATIVE_STICKY_MAX_WAIT / 100).times do # If we're past the first rebalance, no need to wait if @rebalance_manager.active? # We give it a a bit of time because librdkafka has a tendency to do some-post # callback work that from its perspective is still under rebalance sleep(5) if active_wait break end active_wait = true # poll to trigger potential rebalances that could occur during stopping and to trigger # potential callbacks poll(100) sleep(0.1) end end close end |
#store_offset(message) ⇒ Object
Stores offset for a given partition of a given topic based on the provided message.
112 113 114 |
# File 'lib/karafka/connection/client.rb', line 112 def store_offset() internal_store_offset() end |