Class: Karafka::BaseConsumer
- Inherits:
-
Object
- Object
- Karafka::BaseConsumer
- Extended by:
- Forwardable
- Includes:
- Core::Taggable
- Defined in:
- lib/karafka/base_consumer.rb
Overview
Base consumer from which all Karafka consumers should inherit
Direct Known Subclasses
ActiveJob::Consumer, Pro::RecurringTasks::Consumer, Pro::ScheduledMessages::Consumer
Instance Attribute Summary collapse
-
#client ⇒ Karafka::Connection::Client
Kafka connection client.
-
#coordinator ⇒ Karafka::Processing::Coordinator
Coordinator.
-
#id ⇒ String
readonly
Id of the current consumer.
-
#messages ⇒ Karafka::Routing::Topic
Topic to which a given consumer is subscribed.
-
#producer ⇒ Waterdrop::Producer
Producer instance.
Instance Method Summary collapse
-
#initialize ⇒ BaseConsumer
constructor
Creates new consumer and assigns it an id.
- #on_after_consume ⇒ Object
-
#on_before_consume ⇒ Object
Can be used to run preparation code in the worker.
-
#on_before_schedule_consume ⇒ Object
Can be used to run preparation code prior to the job being enqueued.
-
#on_before_schedule_eofed ⇒ Object
Can be used to run code prior to scheduling of eofed execution.
-
#on_before_schedule_idle ⇒ Object
Can be used to run code prior to scheduling of idle execution.
-
#on_before_schedule_revoked ⇒ Object
Can be used to run code prior to scheduling of revoked execution.
-
#on_before_schedule_shutdown ⇒ Object
Can be used to run code prior to scheduling of revoked execution.
-
#on_consume ⇒ Boolean
Executes the default consumer flow.
-
#on_eofed ⇒ Object
Trigger method for running on eof without messages.
-
#on_idle ⇒ Object
Trigger method for running on idle runs without messages.
-
#on_initialized ⇒ Object
Trigger method running after consumer is fully initialized.
-
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
-
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
-
#on_wrap(action, &block) ⇒ Object
Executes the default wrapping flow.
Constructor Details
#initialize ⇒ BaseConsumer
Creates new consumer and assigns it an id
34 35 36 37 |
# File 'lib/karafka/base_consumer.rb', line 34 def initialize @id = SecureRandom.hex(6) @used = false end |
Instance Attribute Details
#client ⇒ Karafka::Connection::Client
Returns kafka connection client.
27 28 29 |
# File 'lib/karafka/base_consumer.rb', line 27 def client @client end |
#coordinator ⇒ Karafka::Processing::Coordinator
Returns coordinator.
29 30 31 |
# File 'lib/karafka/base_consumer.rb', line 29 def coordinator @coordinator end |
#id ⇒ String (readonly)
Returns id of the current consumer.
23 24 25 |
# File 'lib/karafka/base_consumer.rb', line 23 def id @id end |
#messages ⇒ Karafka::Routing::Topic
Returns topic to which a given consumer is subscribed.
25 26 27 |
# File 'lib/karafka/base_consumer.rb', line 25 def @messages end |
#producer ⇒ Waterdrop::Producer
Returns producer instance.
31 32 33 |
# File 'lib/karafka/base_consumer.rb', line 31 def producer @producer end |
Instance Method Details
#on_after_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.
We handle and report errors here because of flows that could fail. For example a DLQ flow could fail if it was not able to dispatch the DLQ message. Other “non-user” based flows do not interact with external systems and their errors are expected to bubble up
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/karafka/base_consumer.rb', line 125 def on_after_consume handle_after_consume rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.after_consume.error' ) retry_after_pause end |
#on_before_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff
Can be used to run preparation code in the worker
70 71 72 73 74 75 76 77 |
# File 'lib/karafka/base_consumer.rb', line 70 def on_before_consume ..processed_at = Time.now ..freeze # We run this after the full metadata setup, so we can use all the messages information # if needed handle_before_consume end |
#on_before_schedule_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.
Can be used to run preparation code prior to the job being enqueued
59 60 61 62 |
# File 'lib/karafka/base_consumer.rb', line 59 def on_before_schedule_consume @used = true handle_before_schedule_consume end |
#on_before_schedule_eofed ⇒ Object
Can be used to run code prior to scheduling of eofed execution
140 141 142 |
# File 'lib/karafka/base_consumer.rb', line 140 def on_before_schedule_eofed handle_before_schedule_eofed end |
#on_before_schedule_idle ⇒ Object
Can be used to run code prior to scheduling of idle execution
160 161 162 |
# File 'lib/karafka/base_consumer.rb', line 160 def on_before_schedule_idle handle_before_schedule_idle end |
#on_before_schedule_revoked ⇒ Object
Can be used to run code prior to scheduling of revoked execution
174 175 176 |
# File 'lib/karafka/base_consumer.rb', line 174 def on_before_schedule_revoked handle_before_schedule_revoked end |
#on_before_schedule_shutdown ⇒ Object
Can be used to run code prior to scheduling of revoked execution
195 196 197 |
# File 'lib/karafka/base_consumer.rb', line 195 def on_before_schedule_shutdown handle_before_schedule_shutdown end |
#on_consume ⇒ Boolean
We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.
Executes the default consumer flow.
105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/karafka/base_consumer.rb', line 105 def on_consume handle_consume rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.consume.error' ) end |
#on_eofed ⇒ Object
Trigger method for running on eof without messages
145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/karafka/base_consumer.rb', line 145 def on_eofed handle_eofed rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.eofed.error' ) end |
#on_idle ⇒ Object
Trigger method for running on idle runs without messages
167 168 169 |
# File 'lib/karafka/base_consumer.rb', line 167 def on_idle handle_idle end |
#on_initialized ⇒ Object
Trigger method running after consumer is fully initialized.
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/base_consumer.rb', line 42 def on_initialized handle_initialized rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.initialized.error' ) end |
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
181 182 183 184 185 186 187 188 189 190 |
# File 'lib/karafka/base_consumer.rb', line 181 def on_revoked handle_revoked rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.revoked.error' ) end |
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
202 203 204 205 206 207 208 209 210 211 |
# File 'lib/karafka/base_consumer.rb', line 202 def on_shutdown handle_shutdown rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.shutdown.error' ) end |
#on_wrap(action, &block) ⇒ Object
Executes the default wrapping flow
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/karafka/base_consumer.rb', line 85 def on_wrap(action, &block) handle_wrap(action, &block) rescue StandardError => e monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.wrap.error' ) end |