Class: Karafka::Connection::Client

Inherits:
Object
  • Object
show all
Includes:
Karafka::Core::Helpers::Time
Defined in:
lib/karafka/connection/client.rb

Overview

An abstraction layer on top of the rdkafka consumer.

It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group, batch_poll_breaker) ⇒ Karafka::Connection::Client

Creates a new consumer instance.

Parameters:

  • subscription_group (Karafka::Routing::SubscriptionGroup)

    subscription group with all the configuration details needed for us to create a client

  • batch_poll_breaker (Proc)

    proc that when evaluated to false will cause the batch poll loop to finish early. This improves the shutdown and dynamic multiplication as it allows us to early break on long polls.



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/karafka/connection/client.rb', line 61

def initialize(subscription_group, batch_poll_breaker)
  @id = SecureRandom.hex(6)
  # Name is set when we build consumer
  @name = ''
  @closed = false
  @subscription_group = subscription_group
  @buffer = RawMessagesBuffer.new
  @tick_interval = ::Karafka::App.config.internal.tick_interval
  @rebalance_manager = RebalanceManager.new(@subscription_group.id, @buffer)
  @rebalance_callback = Instrumentation::Callbacks::Rebalance.new(@subscription_group)

  @interval_runner = Helpers::IntervalRunner.new do
    events_poll
    # events poller returns nil when not running often enough, hence we don't use the
    # boolean to be explicit
    batch_poll_breaker.call ? :run : :stop
  end

  # There are few operations that can happen in parallel from the listener threads as well
  # as from the workers. They are not fully thread-safe because they may be composed out of
  # few calls to Kafka or out of few internal state changes. That is why we mutex them.
  # It mostly revolves around pausing and resuming.
  @mutex = Mutex.new
  # We need to keep track of what we have paused for resuming
  # In case we loose partition, we still need to resume it, otherwise it won't be fetched
  # again if we get reassigned to it later on. We need to keep them as after revocation we
  # no longer may be able to fetch them from Kafka. We could build them but it is easier
  # to just keep them here and use if needed when cannot be obtained
  @paused_tpls = Hash.new { |h, k| h[k] = {} }
end

Instance Attribute Details

#idString (readonly)

Returns id of the client.

Returns:

  • (String)

    id of the client



24
25
26
# File 'lib/karafka/connection/client.rb', line 24

def id
  @id
end

#nameString (readonly)

Note:

Consumer name may change in case we regenerate it

Returns underlying consumer name.

Returns:

  • (String)

    underlying consumer name



21
22
23
# File 'lib/karafka/connection/client.rb', line 21

def name
  @name
end

#rebalance_managerObject (readonly)

Returns the value of attribute rebalance_manager.



13
14
15
# File 'lib/karafka/connection/client.rb', line 13

def rebalance_manager
  @rebalance_manager
end

#subscription_groupKarafka::Routing::SubscriptionGroup (readonly)

Returns subscription group to which this client belongs to.

Returns:



17
18
19
# File 'lib/karafka/connection/client.rb', line 17

def subscription_group
  @subscription_group
end

Instance Method Details

#assignmentRdkafka::Consumer::TopicPartitionList

Returns current active assignment.

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

    current active assignment



179
180
181
# File 'lib/karafka/connection/client.rb', line 179

def assignment
  kafka.assignment
end

#assignment_lost?Boolean

Returns true if our current assignment has been lost involuntarily.

Returns:

  • (Boolean)

    true if our current assignment has been lost involuntarily.



174
175
176
# File 'lib/karafka/connection/client.rb', line 174

def assignment_lost?
  kafka.assignment_lost?
end

#batch_pollKarafka::Connection::MessagesBuffer

Note:

This method should not be executed from many threads at the same time

Fetches messages within boundaries defined by the settings (time, size, topics, etc).

Also periodically runs the events polling to trigger events callbacks.

Returns:



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/karafka/connection/client.rb', line 99

def batch_poll
  time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time)

  @buffer.clear
  @rebalance_manager.clear

  events_poll

  loop do
    time_poll.start

    # Don't fetch more messages if we do not have any time left
    break if time_poll.exceeded?
    # Don't fetch more messages if we've fetched max that we've wanted
    break if @buffer.size >= @subscription_group.max_messages

    # Fetch message within our time boundaries
    response = poll(time_poll.remaining)

    # We track when last polling happened so we can provide means to detect upcoming
    # `max.poll.interval.ms` limit
    @buffer.polled

    case response
    when :tick_time
      nil
    # We get a hash only in case of eof error
    when Hash
      @buffer.eof(response[:topic], response[:partition])
    when nil
      nil
    else
      @buffer << response
    end

    # Upon polling rebalance manager might have been updated.
    # If partition revocation happens, we need to remove messages from revoked partitions
    # as well as ensure we do not have duplicated due to the offset reset for partitions
    # that we got assigned
    #
    # We also do early break, so the information about rebalance is used as soon as possible
    if @rebalance_manager.changed?
      # Since rebalances do not occur often, we can run events polling as well without
      # any throttling
      events_poll

      break
    end

    # If we were signaled from the outside to break the loop, we should
    break if @interval_runner.call == :stop

    # Track time spent on all of the processing and polling
    time_poll.checkpoint

    # Finally once we've (potentially) removed revoked, etc, if no messages were returned
    # and it was not an early poll exist, we can break. We also break if we got the eof
    # signaling to propagate it asap
    # Worth keeping in mind, that the rebalance manager might have been updated despite no
    # messages being returned during a poll
    break if response.nil? || response.is_a?(Hash)
  end

  @buffer
end

#commit_offsets(async: true) ⇒ Boolean

Note:

This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want them to be flushed

Note:

This method for async may return ‘true` despite involuntary partition revocation as it does not resolve to `lost_assignment?`. It returns only the commit state operation result.

Commits the offset on a current consumer in a non-blocking or blocking way.

Parameters:

  • async (Boolean) (defaults to: true)

    should the commit happen async or sync (async by default)

Returns:

  • (Boolean)

    did committing was successful. It may be not, when we no longer own given partition.



196
197
198
# File 'lib/karafka/connection/client.rb', line 196

def commit_offsets(async: true)
  internal_commit_offsets(async: async)
end

#commit_offsets!Object

Commits offset in a synchronous way.

See Also:

  • for more details


203
204
205
# File 'lib/karafka/connection/client.rb', line 203

def commit_offsets!
  commit_offsets(async: false)
end

#committed(tpl = nil) ⇒ Rdkafka::Consumer::TopicPartitionList

Note:

It is recommended to use this only on rebalances to get positions with metadata when working with metadata as this is synchronous

Return the current committed offset per partition for this consumer group. The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList) (defaults to: nil)

    for which we want to get committed

Returns:

  • (Rdkafka::Consumer::TopicPartitionList)

Raises:

  • (Rdkafka::RdkafkaError)

    When getting the committed positions fails.



417
418
419
# File 'lib/karafka/connection/client.rb', line 417

def committed(tpl = nil)
  @wrapped_kafka.committed(tpl)
end

#consumer_group_metadata_pointerFFI::Pointer

Returns pointer to the consumer group metadata. It is used only in the context of exactly-once-semantics in transactions, this is why it is never remapped to Ruby

Returns:

  • (FFI::Pointer)


404
405
406
# File 'lib/karafka/connection/client.rb', line 404

def 
  kafka.
end

#events_poll(timeout = 0) ⇒ Object

Note:

It is non-blocking when timeout 0 and will not wait if queue empty. It costs up to 2ms when no callbacks are triggered.

Triggers the rdkafka main queue events by consuming this queue. This is not the consumer consumption queue but the one with:

- error callbacks
- stats callbacks
- OAUTHBEARER token refresh callbacks

Parameters:

  • timeout (Integer) (defaults to: 0)

    number of milliseconds to wait on events or 0 not to wait.



397
398
399
# File 'lib/karafka/connection/client.rb', line 397

def events_poll(timeout = 0)
  kafka.events_poll(timeout)
end

#mark_as_consumed(message, metadata = nil) ⇒ Boolean

Note:

This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed. It will however check the ‘librdkafka` assignment ownership to increase accuracy for involuntary revocations.

Marks given message as consumed.

Parameters:

  • message (Karafka::Messages::Message)

    message that we want to mark as processed

  • metadata (String, nil) (defaults to: nil)

    offset storage metadata or nil if none

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



345
346
347
# File 'lib/karafka/connection/client.rb', line 345

def mark_as_consumed(message,  = nil)
  store_offset(message, ) && !assignment_lost?
end

#mark_as_consumed!(message, metadata = nil) ⇒ Boolean

Marks a given message as consumed and commits the offsets in a blocking way.

Parameters:

  • message (Karafka::Messages::Message)

    message that we want to mark as processed

  • metadata (String, nil) (defaults to: nil)

    offset storage metadata or nil if none

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



354
355
356
357
358
# File 'lib/karafka/connection/client.rb', line 354

def mark_as_consumed!(message,  = nil)
  return false unless mark_as_consumed(message, )

  commit_offsets!
end

#pause(topic, partition, offset = nil) ⇒ Object

Note:

This will pause indefinitely and requires manual ‘#resume`

Note:

When ‘#internal_seek` is not involved (when offset is `nil`) we will not purge the librdkafka buffers and continue from the last cursor offset

Pauses given partition and moves back to last successful offset processed.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition

  • offset (Integer, nil) (defaults to: nil)

    offset of the message on which we want to pause (this message will be reprocessed after getting back to processing) or nil if we want to pause and resume from the consecutive offset (+1 from the last message passed to us by librdkafka)



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/karafka/connection/client.rb', line 227

def pause(topic, partition, offset = nil)
  @mutex.synchronize do
    # Do not pause if the client got closed, would not change anything
    return if @closed

    internal_commit_offsets(async: true)

    # Here we do not use our cached tpls because we should not try to pause something we do
    # not own anymore.
    tpl = topic_partition_list(topic, partition)

    return unless tpl

    Karafka.monitor.instrument(
      'client.pause',
      caller: self,
      subscription_group: @subscription_group,
      topic: topic,
      partition: partition,
      offset: offset
    )

    @paused_tpls[topic][partition] = tpl

    kafka.pause(tpl)

    # If offset is not provided, will pause where it finished.
    # This makes librdkafka not purge buffers and can provide significant network savings
    # when we just want to pause before further processing without changing the offsets
    return unless offset

    pause_msg = Messages::Seek.new(topic, partition, offset)

    internal_seek(pause_msg)
  end
end

#pingObject

Runs a single poll on the main queue and consumer queue ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective

This is used only to trigger rebalance callbacks and other callbacks



380
381
382
383
384
385
# File 'lib/karafka/connection/client.rb', line 380

def ping
  events_poll(100)
  poll(100)
rescue Rdkafka::RdkafkaError
  nil
end

#resetObject

Closes and resets the client completely.



361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/karafka/connection/client.rb', line 361

def reset
  Karafka.monitor.instrument(
    'client.reset',
    caller: self,
    subscription_group: @subscription_group
  ) do
    close

    @interval_runner.reset
    @closed = false
    @paused_tpls.clear
  end
end

#resume(topic, partition) ⇒ Object

Resumes processing of a give topic partition after it was paused.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/karafka/connection/client.rb', line 268

def resume(topic, partition)
  @mutex.synchronize do
    return if @closed

    # We now commit offsets on rebalances, thus we can do it async just to make sure
    internal_commit_offsets(async: true)

    # If we were not able, let's try to reuse the one we have (if we have)
    tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition]

    return unless tpl

    # If we did not have it, it means we never paused this partition, thus no resume should
    # happen in the first place
    return unless @paused_tpls[topic].delete(partition)

    Karafka.monitor.instrument(
      'client.resume',
      caller: self,
      subscription_group: @subscription_group,
      topic: topic,
      partition: partition
    )

    kafka.resume(tpl)
  end
end

#seek(message) ⇒ Object

Note:

Please note, that if you are seeking to a time offset, getting the offset is blocking

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:



213
214
215
# File 'lib/karafka/connection/client.rb', line 213

def seek(message)
  @mutex.synchronize { internal_seek(message) }
end

#stopObject

Gracefully stops topic consumption.



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/karafka/connection/client.rb', line 297

def stop
  # librdkafka has several constant issues when shutting down during rebalance. This is
  # an issue that gets back every few versions of librdkafka in a limited scope, for example
  # for cooperative-sticky or in a general scope. This is why we unsubscribe and wait until
  # we no longer have any assignments. That way librdkafka consumer shutdown should never
  # happen with rebalance associated with the given consumer instance. Since we do not want
  # to wait forever, we also impose a limit on how long should we wait. This prioritizes
  # shutdown stability over endless wait.
  #
  # The `@unsubscribing` ensures that when there would be a direct close attempt, it
  # won't get into this loop again. This can happen when supervision decides it should close
  # things faster
  #
  # @see https://github.com/confluentinc/librdkafka/issues/4792
  # @see https://github.com/confluentinc/librdkafka/issues/4527
  if unsubscribe?
    @unsubscribing = true

    # Give 50% of time for the final close before we reach the forceful
    max_wait = ::Karafka::App.config.shutdown_timeout * COOP_UNSUBSCRIBE_FACTOR
    used = 0
    stopped_at = monotonic_now

    unsubscribe

    until assignment.empty?
      used += monotonic_now - stopped_at
      stopped_at = monotonic_now

      break if used >= max_wait

      sleep(0.1)

      ping
    end
  end

  close
end

#store_offset(message, offset_metadata = nil) ⇒ Object

Stores offset for a given partition of a given topic based on the provided message.

Parameters:

  • message (Karafka::Messages::Message)
  • offset_metadata (String, nil) (defaults to: nil)

    offset storage metadata or nil if none



169
170
171
# File 'lib/karafka/connection/client.rb', line 169

def store_offset(message,  = nil)
  internal_store_offset(message, )
end