Class: Karafka::BaseConsumer

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Taggable
Defined in:
lib/karafka/base_consumer.rb

Overview

Base consumer from which all Karafka consumers should inherit

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeBaseConsumer

Creates new consumer and assigns it an id



31
32
33
34
# File 'lib/karafka/base_consumer.rb', line 31

def initialize
  @id = SecureRandom.hex(6)
  @used = false
end

Instance Attribute Details

#clientKarafka::Connection::Client

Returns kafka connection client.

Returns:



24
25
26
# File 'lib/karafka/base_consumer.rb', line 24

def client
  @client
end

#coordinatorKarafka::Processing::Coordinator

Returns coordinator.

Returns:



26
27
28
# File 'lib/karafka/base_consumer.rb', line 26

def coordinator
  @coordinator
end

#idString (readonly)

Returns id of the current consumer.

Returns:

  • (String)

    id of the current consumer



20
21
22
# File 'lib/karafka/base_consumer.rb', line 20

def id
  @id
end

#messagesKarafka::Routing::Topic

Returns topic to which a given consumer is subscribed.

Returns:



22
23
24
# File 'lib/karafka/base_consumer.rb', line 22

def messages
  @messages
end

#producerWaterdrop::Producer

Returns producer instance.

Returns:

  • (Waterdrop::Producer)

    producer instance



28
29
30
# File 'lib/karafka/base_consumer.rb', line 28

def producer
  @producer
end

Instance Method Details

#on_after_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.

Note:

We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/base_consumer.rb', line 105

def on_after_consume
  handle_after_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.after_consume.error'
  )

  retry_after_pause
end

#on_before_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff

Can be used to run preparation code in the worker



67
68
69
70
71
72
73
74
# File 'lib/karafka/base_consumer.rb', line 67

def on_before_consume
  messages..processed_at = Time.now
  messages..freeze

  # We run this after the full metadata setup, so we can use all the messages information
  # if needed
  handle_before_consume
end

#on_before_schedule_consumeObject

Note:

This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.

Can be used to run preparation code prior to the job being enqueued



56
57
58
59
# File 'lib/karafka/base_consumer.rb', line 56

def on_before_schedule_consume
  @used = true
  handle_before_schedule_consume
end

#on_before_schedule_eofedObject

Can be used to run code prior to scheduling of eofed execution



120
121
122
# File 'lib/karafka/base_consumer.rb', line 120

def on_before_schedule_eofed
  handle_before_schedule_eofed
end

#on_before_schedule_idleObject

Can be used to run code prior to scheduling of idle execution



140
141
142
# File 'lib/karafka/base_consumer.rb', line 140

def on_before_schedule_idle
  handle_before_schedule_idle
end

#on_before_schedule_revokedObject

Can be used to run code prior to scheduling of revoked execution



154
155
156
# File 'lib/karafka/base_consumer.rb', line 154

def on_before_schedule_revoked
  handle_before_schedule_revoked
end

#on_before_schedule_shutdownObject

Can be used to run code prior to scheduling of revoked execution



175
176
177
# File 'lib/karafka/base_consumer.rb', line 175

def on_before_schedule_shutdown
  handle_before_schedule_shutdown
end

#on_consumeBoolean

Note:

We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.

Executes the default consumer flow.

Returns:

  • (Boolean)

    true if there was no exception, otherwise false.



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/karafka/base_consumer.rb', line 85

def on_consume
  handle_consume
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.consume.error'
  )
end

#on_eofedObject

Trigger method for running on eof without messages



125
126
127
128
129
130
131
132
133
134
135
# File 'lib/karafka/base_consumer.rb', line 125

def on_eofed
  handle_eofed
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    seek_offset: coordinator.seek_offset,
    type: 'consumer.eofed.error'
  )
end

#on_idleObject

Trigger method for running on idle runs without messages



147
148
149
# File 'lib/karafka/base_consumer.rb', line 147

def on_idle
  handle_idle
end

#on_initializedObject

Trigger method running after consumer is fully initialized.



39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/base_consumer.rb', line 39

def on_initialized
  handle_initialized
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.initialized.error'
  )
end

#on_revokedObject

Trigger method for running on partition revocation.



161
162
163
164
165
166
167
168
169
170
# File 'lib/karafka/base_consumer.rb', line 161

def on_revoked
  handle_revoked
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.revoked.error'
  )
end

#on_shutdownObject

Trigger method for running on shutdown.



182
183
184
185
186
187
188
189
190
191
# File 'lib/karafka/base_consumer.rb', line 182

def on_shutdown
  handle_shutdown
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    error: e,
    caller: self,
    type: 'consumer.shutdown.error'
  )
end