Class: NulogyMessageBusConsumer::Steps::CommitOnSuccess
- Inherits:
-
Object
- Object
- NulogyMessageBusConsumer::Steps::CommitOnSuccess
- Defined in:
- lib/nulogy_message_bus_consumer/steps/commit_on_success.rb
Instance Method Summary collapse
- #call(kafka_consumer:, message:, **_) ⇒ Object
-
#initialize(logger) ⇒ CommitOnSuccess
constructor
A new instance of CommitOnSuccess.
Constructor Details
#initialize(logger) ⇒ CommitOnSuccess
Returns a new instance of CommitOnSuccess.
4 5 6 |
# File 'lib/nulogy_message_bus_consumer/steps/commit_on_success.rb', line 4 def initialize(logger) @logger = logger end |
Instance Method Details
#call(kafka_consumer:, message:, **_) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/nulogy_message_bus_consumer/steps/commit_on_success.rb', line 8 def call(kafka_consumer:, message:, **_) result = yield raise_if_invalid(result) if result == :success kafka_consumer.store_offset() kafka_consumer.commit @logger.info(JSON.dump({ event: "message_committed", kafka_message_id: .id, message: .to_h })) else (kafka_consumer) @logger.info(JSON.dump({ event: "message_failed", kafka_message_id: .id, message: .to_h })) end result end |