Class: Karafka::Connection::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/client.rb

Overview

An abstraction layer on top of the rdkafka consumer.

It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group) ⇒ Karafka::Connection::Client

Creates a new consumer instance.

Parameters:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/karafka/connection/client.rb', line 39

def initialize(subscription_group)
  @id = SecureRandom.hex(6)
  # Name is set when we build consumer
  @name = ''
  @closed = false
  @subscription_group = subscription_group
  @buffer = RawMessagesBuffer.new
  @rebalance_manager = RebalanceManager.new
  @kafka = build_consumer
  # There are few operations that can happen in parallel from the listener threads as well
  # as from the workers. They are not fully thread-safe because they may be composed out of
  # few calls to Kafka or out of few internal state changes. That is why we mutex them.
  # It mostly revolves around pausing and resuming.
  @mutex = Mutex.new
  # We need to keep track of what we have paused for resuming
  # In case we loose partition, we still need to resume it, otherwise it won't be fetched
  # again if we get reassigned to it later on. We need to keep them as after revocation we
  # no longer may be able to fetch them from Kafka. We could build them but it is easier
  # to just keep them here and use if needed when cannot be obtained
  @paused_tpls = Hash.new { |h, k| h[k] = {} }
end

Instance Attribute Details

#idString (readonly)

Returns id of the client.

Returns:

  • (String)

    id of the client



18
19
20
# File 'lib/karafka/connection/client.rb', line 18

def id
  @id
end

#nameString (readonly)

Note:

Consumer name may change in case we regenerate it

Returns underlying consumer name.

Returns:

  • (String)

    underlying consumer name



15
16
17
# File 'lib/karafka/connection/client.rb', line 15

def name
  @name
end

#rebalance_managerObject (readonly)

Returns the value of attribute rebalance_manager.



11
12
13
# File 'lib/karafka/connection/client.rb', line 11

def rebalance_manager
  @rebalance_manager
end

Instance Method Details

#assignment_lost?Boolean

Returns true if our current assignment has been lost involuntarily.

Returns:

  • (Boolean)

    true if our current assignment has been lost involuntarily.



117
118
119
# File 'lib/karafka/connection/client.rb', line 117

def assignment_lost?
  @kafka.assignment_lost?
end

#batch_pollKarafka::Connection::MessagesBuffer

Note:

This method should not be executed from many threads at the same time

Fetches messages within boundaries defined by the settings (time, size, topics, etc).

Returns:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/karafka/connection/client.rb', line 66

def batch_poll
  time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time)

  @buffer.clear
  @rebalance_manager.clear

  loop do
    time_poll.start

    # Don't fetch more messages if we do not have any time left
    break if time_poll.exceeded?
    # Don't fetch more messages if we've fetched max as we've wanted
    break if @buffer.size >= @subscription_group.max_messages

    # Fetch message within our time boundaries
    message = poll(time_poll.remaining)

    # Put a message to the buffer if there is one
    @buffer << message if message

    # Upon polling rebalance manager might have been updated.
    # If partition revocation happens, we need to remove messages from revoked partitions
    # as well as ensure we do not have duplicated due to the offset reset for partitions
    # that we got assigned
    # We also do early break, so the information about rebalance is used as soon as possible
    if @rebalance_manager.changed?
      remove_revoked_and_duplicated_messages
      break
    end

    # Track time spent on all of the processing and polling
    time_poll.checkpoint

    # Finally once we've (potentially) removed revoked, etc, if no messages were returned
    # we can break.
    # Worth keeping in mind, that the rebalance manager might have been updated despite no
    # messages being returned during a poll
    break unless message
  end

  @buffer
end

#commit_offsets(async: true) ⇒ Boolean

Note:

This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want them to be flushed

Note:

This method for async may return ‘true` despite involuntary partition revocation as it does not resolve to `lost_assignment?`. It returns only the commit state operation result.

Commits the offset on a current consumer in a non-blocking or blocking way.

Parameters:

  • async (Boolean) (defaults to: true)

    should the commit happen async or sync (async by default)

Returns:

  • (Boolean)

    did committing was successful. It may be not, when we no longer own given partition.



134
135
136
# File 'lib/karafka/connection/client.rb', line 134

def commit_offsets(async: true)
  internal_commit_offsets(async: async)
end

#commit_offsets!Object

Commits offset in a synchronous way.

See Also:

  • for more details


141
142
143
# File 'lib/karafka/connection/client.rb', line 141

def commit_offsets!
  commit_offsets(async: false)
end

#mark_as_consumed(message) ⇒ Boolean

Note:

This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed. It will however check the ‘librdkafka` assignment ownership to increase accuracy for involuntary revocations.

Marks given message as consumed.

Parameters:

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



271
272
273
# File 'lib/karafka/connection/client.rb', line 271

def mark_as_consumed(message)
  store_offset(message) && !assignment_lost?
end

#mark_as_consumed!(message) ⇒ Boolean

Marks a given message as consumed and commits the offsets in a blocking way.

Parameters:

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



279
280
281
282
283
# File 'lib/karafka/connection/client.rb', line 279

def mark_as_consumed!(message)
  return false unless mark_as_consumed(message)

  commit_offsets!
end

#pause(topic, partition, offset) ⇒ Object

Note:

This will pause indefinitely and requires manual ‘#resume`

Pauses given partition and moves back to last successful offset processed.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition

  • offset (Integer)

    offset of the message on which we want to pause (this message will be reprocessed after getting back to processing)



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/karafka/connection/client.rb', line 162

def pause(topic, partition, offset)
  @mutex.synchronize do
    # Do not pause if the client got closed, would not change anything
    return if @closed

    pause_msg = Messages::Seek.new(topic, partition, offset)

    internal_commit_offsets(async: true)

    # Here we do not use our cached tpls because we should not try to pause something we do
    # not own anymore.
    tpl = topic_partition_list(topic, partition)

    return unless tpl

    Karafka.monitor.instrument(
      'client.pause',
      caller: self,
      subscription_group: @subscription_group,
      topic: topic,
      partition: partition,
      offset: offset
    )

    @paused_tpls[topic][partition] = tpl

    @kafka.pause(tpl)
    internal_seek(pause_msg)
  end
end

#pingObject

Runs a single poll ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective

This is used only to trigger rebalance callbacks



299
300
301
302
303
# File 'lib/karafka/connection/client.rb', line 299

def ping
  poll(100)
rescue Rdkafka::RdkafkaError
  nil
end

#resetObject

Closes and resets the client completely.



286
287
288
289
290
291
292
# File 'lib/karafka/connection/client.rb', line 286

def reset
  close

  @closed = false
  @paused_tpls.clear
  @kafka = build_consumer
end

#resume(topic, partition) ⇒ Object

Resumes processing of a give topic partition after it was paused.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/karafka/connection/client.rb', line 197

def resume(topic, partition)
  @mutex.synchronize do
    return if @closed

    # We now commit offsets on rebalances, thus we can do it async just to make sure
    internal_commit_offsets(async: true)

    # If we were not able, let's try to reuse the one we have (if we have)
    tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition]

    return unless tpl

    # If we did not have it, it means we never paused this partition, thus no resume should
    # happen in the first place
    return unless @paused_tpls[topic].delete(partition)

    Karafka.monitor.instrument(
      'client.resume',
      caller: self,
      subscription_group: @subscription_group,
      topic: topic,
      partition: partition
    )

    @kafka.resume(tpl)
  end
end

#seek(message) ⇒ Object

Note:

Please note, that if you are seeking to a time offset, getting the offset is blocking

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:



151
152
153
# File 'lib/karafka/connection/client.rb', line 151

def seek(message)
  @mutex.synchronize { internal_seek(message) }
end

#stopObject

Note:

Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages

Gracefully stops topic consumption.



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/karafka/connection/client.rb', line 230

def stop
  # This ensures, that we do not stop the underlying client until it passes the first
  # rebalance for cooperative-sticky. Otherwise librdkafka may crash
  #
  # We set a timeout just in case the rebalance would never happen or would last for an
  # extensive time period.
  #
  # @see https://github.com/confluentinc/librdkafka/issues/4312
  if @subscription_group.kafka[:'partition.assignment.strategy'] == 'cooperative-sticky'
    active_wait = false

    (COOPERATIVE_STICKY_MAX_WAIT / 100).times do
      # If we're past the first rebalance, no need to wait
      if @rebalance_manager.active?
        # We give it a a bit of time because librdkafka has a tendency to do some-post
        # callback work that from its perspective is still under rebalance
        sleep(5) if active_wait

        break
      end

      active_wait = true

      # poll to trigger potential rebalances that could occur during stopping and to trigger
      # potential callbacks
      poll(100)

      sleep(0.1)
    end
  end

  close
end

#store_offset(message) ⇒ Object

Stores offset for a given partition of a given topic based on the provided message.

Parameters:



112
113
114
# File 'lib/karafka/connection/client.rb', line 112

def store_offset(message)
  internal_store_offset(message)
end