Class: Karafka::Connection::Client
- Inherits:
-
Object
- Object
- Karafka::Connection::Client
- Defined in:
- lib/karafka/connection/client.rb
Overview
An abstraction layer on top of the rdkafka consumer.
It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
Id of the client.
-
#name ⇒ String
readonly
Underlying consumer name.
-
#rebalance_manager ⇒ Object
readonly
Returns the value of attribute rebalance_manager.
Instance Method Summary collapse
-
#batch_poll ⇒ Karafka::Connection::MessagesBuffer
Fetches messages within boundaries defined by the settings (time, size, topics, etc).
-
#commit_offsets(async: true) ⇒ Boolean
Commits the offset on a current consumer in a non-blocking or blocking way.
-
#commit_offsets! ⇒ Object
Commits offset in a synchronous way.
-
#initialize(subscription_group) ⇒ Karafka::Connection::Rdk::Consumer
constructor
Creates a new consumer instance.
-
#mark_as_consumed(message) ⇒ Boolean
Marks given message as consumed.
-
#mark_as_consumed!(message) ⇒ Boolean
Marks a given message as consumed and commits the offsets in a blocking way.
-
#pause(topic, partition, offset) ⇒ Object
Pauses given partition and moves back to last successful offset processed.
-
#ping ⇒ Object
Runs a single poll ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective.
-
#reset ⇒ Object
Closes and resets the client completely.
-
#resume(topic, partition) ⇒ Object
Resumes processing of a give topic partition after it was paused.
-
#seek(message) ⇒ Object
Seek to a particular message.
-
#stop ⇒ Object
Gracefully stops topic consumption.
-
#store_offset(message) ⇒ Object
Stores offset for a given partition of a given topic based on the provided message.
Constructor Details
#initialize(subscription_group) ⇒ Karafka::Connection::Rdk::Consumer
Creates a new consumer instance.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/karafka/connection/client.rb', line 34 def initialize(subscription_group) @id = SecureRandom.hex(6) # Name is set when we build consumer @name = '' @mutex = Mutex.new @closed = false @subscription_group = subscription_group @buffer = RawMessagesBuffer.new @rebalance_manager = RebalanceManager.new @kafka = build_consumer # Marks if we need to offset. If we did not store offsets, we should not commit the offset # position as it will crash rdkafka @offsetting = false # We need to keep track of what we have paused for resuming # In case we loose partition, we still need to resume it, otherwise it won't be fetched # again if we get reassigned to it later on. We need to keep them as after revocation we # no longer may be able to fetch them from Kafka. We could build them but it is easier # to just keep them here and use if needed when cannot be obtained @paused_tpls = Hash.new { |h, k| h[k] = {} } end |
Instance Attribute Details
#id ⇒ String (readonly)
Returns id of the client.
18 19 20 |
# File 'lib/karafka/connection/client.rb', line 18 def id @id end |
#name ⇒ String (readonly)
Consumer name may change in case we regenerate it
Returns underlying consumer name.
15 16 17 |
# File 'lib/karafka/connection/client.rb', line 15 def name @name end |
#rebalance_manager ⇒ Object (readonly)
Returns the value of attribute rebalance_manager.
11 12 13 |
# File 'lib/karafka/connection/client.rb', line 11 def rebalance_manager @rebalance_manager end |
Instance Method Details
#batch_poll ⇒ Karafka::Connection::MessagesBuffer
This method should not be executed from many threads at the same time
Fetches messages within boundaries defined by the settings (time, size, topics, etc).
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/connection/client.rb', line 60 def batch_poll time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time) @buffer.clear @rebalance_manager.clear loop do time_poll.start # Don't fetch more messages if we do not have any time left break if time_poll.exceeded? # Don't fetch more messages if we've fetched max as we've wanted break if @buffer.size >= @subscription_group. # Fetch message within our time boundaries = poll(time_poll.remaining) # Put a message to the buffer if there is one @buffer << if # Upon polling rebalance manager might have been updated. # If partition revocation happens, we need to remove messages from revoked partitions # as well as ensure we do not have duplicated due to the offset reset for partitions # that we got assigned # We also do early break, so the information about rebalance is used as soon as possible if @rebalance_manager.changed? break end # Track time spent on all of the processing and polling time_poll.checkpoint # Finally once we've (potentially) removed revoked, etc, if no messages were returned # we can break. # Worth keeping in mind, that the rebalance manager might have been updated despite no # messages being returned during a poll break unless end @buffer end |
#commit_offsets(async: true) ⇒ Boolean
This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want to to be flushed
Commits the offset on a current consumer in a non-blocking or blocking way. Ignoring a case where there would not be an offset (for example when rebalance occurs).
122 123 124 125 126 127 128 |
# File 'lib/karafka/connection/client.rb', line 122 def commit_offsets(async: true) @mutex.lock internal_commit_offsets(async: async) ensure @mutex.unlock end |
#commit_offsets! ⇒ Object
Commits offset in a synchronous way.
133 134 135 |
# File 'lib/karafka/connection/client.rb', line 133 def commit_offsets! commit_offsets(async: false) end |
#mark_as_consumed(message) ⇒ Boolean
This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed
Marks given message as consumed.
239 240 241 |
# File 'lib/karafka/connection/client.rb', line 239 def mark_as_consumed() store_offset() end |
#mark_as_consumed!(message) ⇒ Boolean
Marks a given message as consumed and commits the offsets in a blocking way.
247 248 249 250 251 |
# File 'lib/karafka/connection/client.rb', line 247 def mark_as_consumed!() return false unless mark_as_consumed() commit_offsets! end |
#pause(topic, partition, offset) ⇒ Object
This will pause indefinitely and requires manual ‘#resume`
Pauses given partition and moves back to last successful offset processed.
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/karafka/connection/client.rb', line 156 def pause(topic, partition, offset) @mutex.lock # Do not pause if the client got closed, would not change anything return if @closed pause_msg = Messages::Seek.new(topic, partition, offset) internal_commit_offsets(async: true) # Here we do not use our cached tpls because we should not try to pause something we do # not own anymore. tpl = topic_partition_list(topic, partition) return unless tpl Karafka.monitor.instrument( 'client.pause', caller: self, subscription_group: @subscription_group, topic: topic, partition: partition, offset: offset ) @paused_tpls[topic][partition] = tpl @kafka.pause(tpl) @kafka.seek(pause_msg) ensure @mutex.unlock end |
#ping ⇒ Object
Runs a single poll ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective
This is used only to trigger rebalance callbacks
270 271 272 273 274 |
# File 'lib/karafka/connection/client.rb', line 270 def ping poll(100) rescue Rdkafka::RdkafkaError nil end |
#reset ⇒ Object
Closes and resets the client completely.
254 255 256 257 258 259 260 261 262 263 |
# File 'lib/karafka/connection/client.rb', line 254 def reset close @mutex.synchronize do @closed = false @offsetting = false @paused_tpls.clear @kafka = build_consumer end end |
#resume(topic, partition) ⇒ Object
Resumes processing of a give topic partition after it was paused.
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/karafka/connection/client.rb', line 194 def resume(topic, partition) @mutex.lock return if @closed # We now commit offsets on rebalances, thus we can do it async just to make sure internal_commit_offsets(async: true) # If we were not able, let's try to reuse the one we have (if we have) tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition] return unless tpl # If we did not have it, it means we never paused this partition, thus no resume should # happen in the first place return unless @paused_tpls[topic].delete(partition) Karafka.monitor.instrument( 'client.resume', caller: self, subscription_group: @subscription_group, topic: topic, partition: partition ) @kafka.resume(tpl) ensure @mutex.unlock end |
#seek(message) ⇒ Object
Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.
141 142 143 144 145 146 147 |
# File 'lib/karafka/connection/client.rb', line 141 def seek() @mutex.lock @kafka.seek() ensure @mutex.unlock end |
#stop ⇒ Object
Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages
Gracefully stops topic consumption.
229 230 231 |
# File 'lib/karafka/connection/client.rb', line 229 def stop close end |
#store_offset(message) ⇒ Object
Stores offset for a given partition of a given topic based on the provided message.
106 107 108 109 110 |
# File 'lib/karafka/connection/client.rb', line 106 def store_offset() @mutex.synchronize do internal_store_offset() end end |