Class: NulogyMessageBusConsumer::Steps::LogMessages
- Inherits:
-
Object
- Object
- NulogyMessageBusConsumer::Steps::LogMessages
- Defined in:
- lib/nulogy_message_bus_consumer/steps/log_messages.rb
Instance Method Summary collapse
- #call(message:, **_) ⇒ Object
-
#diff_millis(oldest_nanos, newest_millis) ⇒ Object
Debezium appears to be giving us nanos since epoch github.com/debezium/debezium/blob/5a115e902cdc1dc399ec02758dd1039a33e99bc2/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java#L237.
-
#initialize(logger, clock: Clock.new) ⇒ LogMessages
constructor
A new instance of LogMessages.
Constructor Details
#initialize(logger, clock: Clock.new) ⇒ LogMessages
Returns a new instance of LogMessages.
4 5 6 7 |
# File 'lib/nulogy_message_bus_consumer/steps/log_messages.rb', line 4 def initialize(logger, clock: Clock.new) @logger = logger @clock = clock end |
Instance Method Details
#call(message:, **_) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/nulogy_message_bus_consumer/steps/log_messages.rb', line 9 def call(message:, **_) @logger.info(JSON.dump({ event: "message_received", kafka_message_id: .id, message: "Received #{.id}" })) result = yield millis = diff_millis(.created_at, @clock.ms) @logger.info(JSON.dump({ event: "message_processed", kafka_message_id: .id, message: "Processed #{.id} (#{.topic}##{.partition}@#{.offset})", result: result, time_to_processed: millis })) result end |
#diff_millis(oldest_nanos, newest_millis) ⇒ Object
Debezium appears to be giving us nanos since epoch github.com/debezium/debezium/blob/5a115e902cdc1dc399ec02758dd1039a33e99bc2/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java#L237
32 33 34 35 36 |
# File 'lib/nulogy_message_bus_consumer/steps/log_messages.rb', line 32 def diff_millis(oldest_nanos, newest_millis) old_millis = oldest_nanos / 1000 newest_millis - old_millis end |