Class: Karafka::Connection::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/client.rb

Overview

An abstraction layer on top of the rdkafka consumer.

It is threadsafe and provides some security measures so we won’t end up operating on a closed consumer instance as it causes Ruby VM process to crash.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group) ⇒ Karafka::Connection::Rdk::Consumer

Creates a new consumer instance.

Parameters:



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/karafka/connection/client.rb', line 34

def initialize(subscription_group)
  @id = SecureRandom.hex(6)
  # Name is set when we build consumer
  @name = ''
  @mutex = Mutex.new
  @closed = false
  @subscription_group = subscription_group
  @buffer = RawMessagesBuffer.new
  @rebalance_manager = RebalanceManager.new
  @kafka = build_consumer
  # Marks if we need to offset. If we did not store offsets, we should not commit the offset
  # position as it will crash rdkafka
  @offsetting = false
  # We need to keep track of what we have paused for resuming
  # In case we loose partition, we still need to resume it, otherwise it won't be fetched
  # again if we get reassigned to it later on. We need to keep them as after revocation we
  # no longer may be able to fetch them from Kafka. We could build them but it is easier
  # to just keep them here and use if needed when cannot be obtained
  @paused_tpls = Hash.new { |h, k| h[k] = {} }
end

Instance Attribute Details

#idString (readonly)

Returns id of the client.

Returns:

  • (String)

    id of the client



18
19
20
# File 'lib/karafka/connection/client.rb', line 18

def id
  @id
end

#nameString (readonly)

Note:

Consumer name may change in case we regenerate it

Returns underlying consumer name.

Returns:

  • (String)

    underlying consumer name



15
16
17
# File 'lib/karafka/connection/client.rb', line 15

def name
  @name
end

#rebalance_managerObject (readonly)

Returns the value of attribute rebalance_manager.



11
12
13
# File 'lib/karafka/connection/client.rb', line 11

def rebalance_manager
  @rebalance_manager
end

Instance Method Details

#batch_pollKarafka::Connection::MessagesBuffer

Note:

This method should not be executed from many threads at the same time

Fetches messages within boundaries defined by the settings (time, size, topics, etc).

Returns:



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/karafka/connection/client.rb', line 60

def batch_poll
  time_poll = TimeTrackers::Poll.new(@subscription_group.max_wait_time)

  @buffer.clear
  @rebalance_manager.clear

  loop do
    time_poll.start

    # Don't fetch more messages if we do not have any time left
    break if time_poll.exceeded?
    # Don't fetch more messages if we've fetched max as we've wanted
    break if @buffer.size >= @subscription_group.max_messages

    # Fetch message within our time boundaries
    message = poll(time_poll.remaining)

    # Put a message to the buffer if there is one
    @buffer << message if message

    # Upon polling rebalance manager might have been updated.
    # If partition revocation happens, we need to remove messages from revoked partitions
    # as well as ensure we do not have duplicated due to the offset reset for partitions
    # that we got assigned
    # We also do early break, so the information about rebalance is used as soon as possible
    if @rebalance_manager.changed?
      remove_revoked_and_duplicated_messages
      break
    end

    # Track time spent on all of the processing and polling
    time_poll.checkpoint

    # Finally once we've (potentially) removed revoked, etc, if no messages were returned
    # we can break.
    # Worth keeping in mind, that the rebalance manager might have been updated despite no
    # messages being returned during a poll
    break unless message
  end

  @buffer
end

#commit_offsets(async: true) ⇒ Boolean

Note:

This will commit all the offsets for the whole consumer. In order to achieve granular control over where the offset should be for particular topic partitions, the store_offset should be used to only store new offset when we want to to be flushed

Commits the offset on a current consumer in a non-blocking or blocking way. Ignoring a case where there would not be an offset (for example when rebalance occurs).

Parameters:

  • async (Boolean) (defaults to: true)

    should the commit happen async or sync (async by default)

Returns:

  • (Boolean)

    did committing was successful. It may be not, when we no longer own given partition.



122
123
124
125
126
127
128
# File 'lib/karafka/connection/client.rb', line 122

def commit_offsets(async: true)
  @mutex.lock

  internal_commit_offsets(async: async)
ensure
  @mutex.unlock
end

#commit_offsets!Object

Commits offset in a synchronous way.

See Also:

  • for more details


133
134
135
# File 'lib/karafka/connection/client.rb', line 133

def commit_offsets!
  commit_offsets(async: false)
end

#mark_as_consumed(message) ⇒ Boolean

Note:

This method won’t trigger automatic offsets commits, rather relying on the offset check-pointing trigger that happens with each batch processed

Marks given message as consumed.

Parameters:

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



239
240
241
# File 'lib/karafka/connection/client.rb', line 239

def mark_as_consumed(message)
  store_offset(message)
end

#mark_as_consumed!(message) ⇒ Boolean

Marks a given message as consumed and commits the offsets in a blocking way.

Parameters:

Returns:

  • (Boolean)

    true if successful. False if we no longer own given partition



247
248
249
250
251
# File 'lib/karafka/connection/client.rb', line 247

def mark_as_consumed!(message)
  return false unless mark_as_consumed(message)

  commit_offsets!
end

#pause(topic, partition, offset) ⇒ Object

Note:

This will pause indefinitely and requires manual ‘#resume`

Pauses given partition and moves back to last successful offset processed.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition

  • offset (Integer)

    offset of the message on which we want to pause (this message will be reprocessed after getting back to processing)



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/karafka/connection/client.rb', line 156

def pause(topic, partition, offset)
  @mutex.lock

  # Do not pause if the client got closed, would not change anything
  return if @closed

  pause_msg = Messages::Seek.new(topic, partition, offset)

  internal_commit_offsets(async: true)

  # Here we do not use our cached tpls because we should not try to pause something we do
  # not own anymore.
  tpl = topic_partition_list(topic, partition)

  return unless tpl

  Karafka.monitor.instrument(
    'client.pause',
    caller: self,
    subscription_group: @subscription_group,
    topic: topic,
    partition: partition,
    offset: offset
  )

  @paused_tpls[topic][partition] = tpl

  @kafka.pause(tpl)

  @kafka.seek(pause_msg)
ensure
  @mutex.unlock
end

#pingObject

Runs a single poll ignoring all the potential errors This is used as a keep-alive in the shutdown stage and any errors that happen here are irrelevant from the shutdown process perspective

This is used only to trigger rebalance callbacks



270
271
272
273
274
# File 'lib/karafka/connection/client.rb', line 270

def ping
  poll(100)
rescue Rdkafka::RdkafkaError
  nil
end

#resetObject

Closes and resets the client completely.



254
255
256
257
258
259
260
261
262
263
# File 'lib/karafka/connection/client.rb', line 254

def reset
  close

  @mutex.synchronize do
    @closed = false
    @offsetting = false
    @paused_tpls.clear
    @kafka = build_consumer
  end
end

#resume(topic, partition) ⇒ Object

Resumes processing of a give topic partition after it was paused.

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/karafka/connection/client.rb', line 194

def resume(topic, partition)
  @mutex.lock

  return if @closed

  # We now commit offsets on rebalances, thus we can do it async just to make sure
  internal_commit_offsets(async: true)

  # If we were not able, let's try to reuse the one we have (if we have)
  tpl = topic_partition_list(topic, partition) || @paused_tpls[topic][partition]

  return unless tpl

  # If we did not have it, it means we never paused this partition, thus no resume should
  # happen in the first place
  return unless @paused_tpls[topic].delete(partition)

  Karafka.monitor.instrument(
    'client.resume',
    caller: self,
    subscription_group: @subscription_group,
    topic: topic,
    partition: partition
  )

  @kafka.resume(tpl)
ensure
  @mutex.unlock
end

#seek(message) ⇒ Object

Seek to a particular message. The next poll on the topic/partition will return the message at the given offset.

Parameters:



141
142
143
144
145
146
147
# File 'lib/karafka/connection/client.rb', line 141

def seek(message)
  @mutex.lock

  @kafka.seek(message)
ensure
  @mutex.unlock
end

#stopObject

Note:

Stopping running consumers without a really important reason is not recommended as until all the consumers are stopped, the server will keep running serving only part of the messages

Gracefully stops topic consumption.



229
230
231
# File 'lib/karafka/connection/client.rb', line 229

def stop
  close
end

#store_offset(message) ⇒ Object

Stores offset for a given partition of a given topic based on the provided message.

Parameters:



106
107
108
109
110
# File 'lib/karafka/connection/client.rb', line 106

def store_offset(message)
  @mutex.synchronize do
    internal_store_offset(message)
  end
end