Class: Karafka::BaseConsumer
- Inherits:
-
Object
- Object
- Karafka::BaseConsumer
- Defined in:
- lib/karafka/base_consumer.rb
Overview
Base consumer from which all Karafka consumers should inherit
Direct Known Subclasses
Instance Attribute Summary collapse
-
#client ⇒ Karafka::Connection::Client
Kafka connection client.
-
#coordinator ⇒ Karafka::Processing::Coordinator
Coordinator.
-
#id ⇒ String
readonly
Id of the current consumer.
-
#messages ⇒ Karafka::Messages::Messages
Current messages batch.
-
#producer ⇒ Waterdrop::Producer
Producer instance.
-
#topic ⇒ Karafka::Routing::Topic
Topic to which a given consumer is subscribed.
Instance Method Summary collapse
-
#initialize ⇒ BaseConsumer
constructor
Creates new consumer and assigns it an id.
- #on_after_consume ⇒ Object
-
#on_before_consume ⇒ Object
Can be used to run preparation code in the worker.
-
#on_before_enqueue ⇒ Object
Can be used to run preparation code prior to the job being enqueued.
-
#on_consume ⇒ Boolean
Executes the default consumer flow.
-
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
-
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
Constructor Details
#initialize ⇒ BaseConsumer
Creates new consumer and assigns it an id
21 22 23 |
# File 'lib/karafka/base_consumer.rb', line 21 def initialize @id = SecureRandom.hex(6) end |
Instance Attribute Details
#client ⇒ Karafka::Connection::Client
Returns kafka connection client.
14 15 16 |
# File 'lib/karafka/base_consumer.rb', line 14 def client @client end |
#coordinator ⇒ Karafka::Processing::Coordinator
Returns coordinator.
16 17 18 |
# File 'lib/karafka/base_consumer.rb', line 16 def coordinator @coordinator end |
#id ⇒ String (readonly)
Returns id of the current consumer.
8 9 10 |
# File 'lib/karafka/base_consumer.rb', line 8 def id @id end |
#messages ⇒ Karafka::Messages::Messages
Returns current messages batch.
12 13 14 |
# File 'lib/karafka/base_consumer.rb', line 12 def @messages end |
#producer ⇒ Waterdrop::Producer
Returns producer instance.
18 19 20 |
# File 'lib/karafka/base_consumer.rb', line 18 def producer @producer end |
#topic ⇒ Karafka::Routing::Topic
Returns topic to which a given consumer is subscribed.
10 11 12 |
# File 'lib/karafka/base_consumer.rb', line 10 def topic @topic end |
Instance Method Details
#on_after_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things but not as part of the public api.
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/karafka/base_consumer.rb', line 86 def on_after_consume handle_after_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.after_consume.error' ) end |
#on_before_consume ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as part of the public api. This can act as a hook when creating non-blocking consumers and doing other advanced stuff
Can be used to run preparation code in the worker
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/karafka/base_consumer.rb', line 48 def on_before_consume ..processed_at = Time.now ..freeze # We run this after the full metadata setup, so we can use all the messages information # if needed handle_before_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.before_consume.error' ) end |
#on_before_enqueue ⇒ Object
This should not be used by the end users as it is part of the lifecycle of things and not as a part of the public api. This should not perform any extensive operations as it is blocking and running in the listener thread.
Can be used to run preparation code prior to the job being enqueued
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/karafka/base_consumer.rb', line 31 def on_before_enqueue handle_before_enqueue rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.before_enqueue.error' ) end |
#on_consume ⇒ Boolean
We keep the seek offset tracking, and use it to compensate for async offset flushing that may not yet kick in when error occurs. That way we pause always on the last processed message.
Executes the default consumer flow.
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/base_consumer.rb', line 71 def on_consume handle_consume rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, seek_offset: coordinator.seek_offset, type: 'consumer.consume.error' ) end |
#on_revoked ⇒ Object
Trigger method for running on partition revocation.
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/karafka/base_consumer.rb', line 100 def on_revoked handle_revoked rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.revoked.error' ) end |
#on_shutdown ⇒ Object
Trigger method for running on shutdown.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/karafka/base_consumer.rb', line 114 def on_shutdown handle_shutdown rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', error: e, caller: self, type: 'consumer.shutdown.error' ) end |