Class: Dionysus::Consumer::BatchEventsPublisher
- Inherits:
-
Object
- Object
- Dionysus::Consumer::BatchEventsPublisher
- Defined in:
- lib/dionysus/consumer/batch_events_publisher.rb
Instance Method Summary collapse
-
#initialize(config, topic) ⇒ BatchEventsPublisher
constructor
A new instance of BatchEventsPublisher.
- #publish(processed_events) ⇒ Object
Constructor Details
#initialize(config, topic) ⇒ BatchEventsPublisher
Returns a new instance of BatchEventsPublisher.
7 8 9 10 |
# File 'lib/dionysus/consumer/batch_events_publisher.rb', line 7 def initialize(config, topic) @config = config @topic = topic end |
Instance Method Details
#publish(processed_events) ⇒ Object
12 13 14 15 16 17 18 |
# File 'lib/dionysus/consumer/batch_events_publisher.rb', line 12 def publish(processed_events) processed_events .map { |dionysus_event| to_event_data(dionysus_event) } .select(&:present?) # potentially necessary for some Kafka versions? couldn't reproduce this reliably, but the integration spec added a tombstone message for `publish_events_batch_via_event_bus` in the older version .then { |mapped_events| publish_events_batch_via_event_bus(mapped_events) } .then { |mapped_events| mapped_events.each { |event_data| publish_single_event_via_event_bus(event_data) } } end |