Class: Karafka::BaseConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/base_consumer.rb

Overview

Base consumer from which all Karafka consumers should inherit

Direct Known Subclasses

ActiveJob::Consumer, Pro::BaseConsumer

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBaseConsumer

Creates new consumer and assigns it an id



21
22
23
# File 'lib/karafka/base_consumer.rb', line 21

def initialize
  @id = SecureRandom.hex(6)
end

Instance Attribute Details

#clientKarafka::Connection::Client

Returns kafka connection client.

Returns:



14
15
16
# File 'lib/karafka/base_consumer.rb', line 14

def client
  @client
end

#coordinatorKarafka::Processing::Coordinator

Returns coordinator.

Returns:



16
17
18
# File 'lib/karafka/base_consumer.rb', line 16

def coordinator
  @coordinator
end

#idString (readonly)

Returns id of the current consumer.

Returns:

  • (String)

    id of the current consumer



8
9
10
# File 'lib/karafka/base_consumer.rb', line 8

def id
  @id
end

#messagesKarafka::Messages::Messages

Returns current messages batch.

Returns:



12
13
14
# File 'lib/karafka/base_consumer.rb', line 12

def messages
  @messages
end

#producerWaterdrop::Producer

Returns producer instance.

Returns:

  • (Waterdrop::Producer)

    producer instance



18
19
20
# File 'lib/karafka/base_consumer.rb', line 18

def producer
  @producer
end

#topicKarafka::Routing::Topic

Returns topic to which a given consumer is subscribed.

Returns:



10
11
12
# File 'lib/karafka/base_consumer.rb', line 10

def topic
  @topic
end

Instance Method Details

#on_after_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.



86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/base_consumer.rb', line 86

def on_after_consume
  handle_after_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.after_consume.error'
  )
end

#on_before_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff

Can be used to run preparation code in the worker



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/karafka/base_consumer.rb', line 48

def on_before_consume
  messages..processed_at = Time.now
  messages..freeze

  # We run this after the full metadata setup, so we can use all the messages information
  # if needed
  handle_before_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.before_consume.error'
  )
end

#on_before_enqueueObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.

Can be used to run preparation code prior to the job being enqueued



31
32
33
34
35
36
37
38
39
40
# File 'lib/karafka/base_consumer.rb', line 31

def on_before_enqueue
  handle_before_enqueue
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.before_enqueue.error'
  )
end

#on_consumeBoolean

Note:

We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.

Executes the default consumer flow.

Returns:

  • (Boolean)

    true if there was no exception, otherwise false.



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/karafka/base_consumer.rb', line 71

def on_consume
  handle_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.consume.error'
  )
end

#on_revokedObject

Trigger method for running on partition revocation.



100
101
102
103
104
105
106
107
108
109
# File 'lib/karafka/base_consumer.rb', line 100

def on_revoked
  handle_revoked
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.revoked.error'
  )
end

#on_shutdownObject

Trigger method for running on shutdown.



114
115
116
117
118
119
120
121
122
123
# File 'lib/karafka/base_consumer.rb', line 114

def on_shutdown
  handle_shutdown
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.shutdown.error'
  )
end