Class: NulogyMessageBusConsumer::Steps::CommitOnSuccess

Inherits:
Object
  • Object
show all
Defined in:
lib/nulogy_message_bus_consumer/steps/commit_on_success.rb

Instance Method Summary collapse

Constructor Details

#initialize(logger) ⇒ CommitOnSuccess

Returns a new instance of CommitOnSuccess.



4
5
6
# File 'lib/nulogy_message_bus_consumer/steps/commit_on_success.rb', line 4

def initialize(logger)
  @logger = logger
end

Instance Method Details

#call(kafka_consumer:, message:, **_) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/nulogy_message_bus_consumer/steps/commit_on_success.rb', line 8

def call(kafka_consumer:, message:, **_)
  result = yield

  raise_if_invalid(result)

  if result == :success
    kafka_consumer.store_offset(message)
    kafka_consumer.commit
    @logger.info(JSON.dump({
      event: "message_committed",
      kafka_message_id: message.id,
      message: message.to_h
    }))
  else
    reconnect_to_reprocess_same_message(kafka_consumer)
    @logger.info(JSON.dump({
      event: "message_failed",
      kafka_message_id: message.id,
      message: message.to_h
    }))
  end

  result
end