Class: Kafka::Interceptors
- Inherits:
-
Object
- Object
- Kafka::Interceptors
- Defined in:
- lib/kafka/interceptors.rb
Overview
Holds a list of interceptors that implement call
and wraps calls to a chain of custom interceptors.
Instance Method Summary collapse
-
#call(intercepted) ⇒ Kafka::PendingMessage || Kafka::FetchedBatch
This method is called when the client produces a message or once the batches are fetched.
-
#initialize(interceptors:, logger:) ⇒ Interceptors
constructor
A new instance of Interceptors.
Constructor Details
#initialize(interceptors:, logger:) ⇒ Interceptors
Returns a new instance of Interceptors.
7 8 9 10 |
# File 'lib/kafka/interceptors.rb', line 7 def initialize(interceptors:, logger:) @interceptors = interceptors || [] @logger = TaggedLogger.new(logger) end |
Instance Method Details
#call(intercepted) ⇒ Kafka::PendingMessage || Kafka::FetchedBatch
This method is called when the client produces a message or once the batches are fetched. The message returned from the first call is passed to the second interceptor call, and so on in an interceptor chain. This method does not throw exceptions.
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/kafka/interceptors.rb', line 21 def call(intercepted) @interceptors.each do |interceptor| begin intercepted = interceptor.call(intercepted) rescue Exception => e @logger.warn "Error executing interceptor for topic: #{intercepted.topic} partition: #{intercepted.partition}: #{e.}\n#{e.backtrace.join("\n")}" end end intercepted end |