Class: Dionysus::Consumer::BatchEventsPublisher

Inherits:
Object
  • Object
show all
Defined in:
lib/dionysus/consumer/batch_events_publisher.rb

Instance Method Summary collapse

Constructor Details

#initialize(config, topic) ⇒ BatchEventsPublisher

Returns a new instance of BatchEventsPublisher.



7
8
9
10
# File 'lib/dionysus/consumer/batch_events_publisher.rb', line 7

def initialize(config, topic)
  @config = config
  @topic = topic
end

Instance Method Details

#publish(processed_events) ⇒ Object



12
13
14
15
16
17
18
# File 'lib/dionysus/consumer/batch_events_publisher.rb', line 12

def publish(processed_events)
  processed_events
    .map { |dionysus_event| to_event_data(dionysus_event) }
    .select(&:present?) # potentially necessary for some Kafka versions? couldn't reproduce this reliably, but the integration spec added a tombstone message for `publish_events_batch_via_event_bus` in the older version
    .then { |mapped_events| publish_events_batch_via_event_bus(mapped_events) }
    .then { |mapped_events| mapped_events.each { |event_data| publish_single_event_via_event_bus(event_data) } }
end