Class: Karafka::Pro::ScheduledMessages::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/scheduled_messages/dispatcher.rb

Overview

Dispatcher responsible for dispatching the messages to appropriate target topics and for dispatching other messages. All messages (aside from the once users dispatch with the envelope) are sent via this dispatcher.

Messages are buffered and dispatched in batches to improve dispatch performance.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, partition) ⇒ Dispatcher

Returns a new instance of Dispatcher.

Parameters:

  • topic (String)

    consumed topic name

  • partition (Integer)

    consumed partition



28
29
30
31
32
33
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 28

def initialize(topic, partition)
  @topic = topic
  @partition = partition
  @buffer = []
  @serializer = Serializer.new
end

Instance Attribute Details

#bufferArray<Hash> (readonly)

Returns buffer with message hashes for dispatch.

Returns:

  • (Array<Hash>)

    buffer with message hashes for dispatch



24
25
26
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 24

def buffer
  @buffer
end

Instance Method Details

#<<(message) ⇒ Object

Note:

This method adds the message to the buffer, does not dispatch it.

Note:

It also produces needed tombstone event as well as an audit log message

Prepares the scheduled message to the dispatch to the target topic. Extracts all the “schedule_” details and prepares it, so the dispatched message goes with the expected attributes to the desired location. Alongside of that it actually builds 2 (1 if logs off) messages: tombstone event matching the schedule so it is no longer valid and the log message that has the same data as the dispatched message. Helpful when debugging.

Parameters:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 46

def <<(message)
  target_headers = message.raw_headers.merge(
    'schedule_source_topic' => @topic,
    'schedule_source_partition' => @partition.to_s,
    'schedule_source_offset' => message.offset.to_s,
    'schedule_source_key' => message.key
  ).compact

  target = {
    payload: message.raw_payload,
    headers: target_headers
  }

  extract(target, message.headers, :topic)
  extract(target, message.headers, :partition)
  extract(target, message.headers, :key)
  extract(target, message.headers, :partition_key)

  @buffer << target

  # Tombstone message so this schedule is no longer in use and gets removed from Kafka by
  # Kafka itself during compacting. It will not cancel it because already dispatched but
  # will cause it not to be sent again and will be marked as dispatched.
  @buffer << Proxy.tombstone(message: message)
end

#flushObject

Sends all messages to Kafka in a sync way. We use sync with batches to prevent overloading. When transactional producer in use, this will be wrapped in a transaction automatically.



90
91
92
93
94
95
96
97
98
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 90

def flush
  until @buffer.empty?
    config.producer.produce_many_sync(
      # We can remove this prior to the dispatch because we only evict messages from the
      # daily buffer once dispatch is successful
      @buffer.shift(config.flush_batch_size)
    )
  end
end

#state(tracker) ⇒ Object

Note:

This is dispatched async because it’s just a statistical metric.

Builds and dispatches the state report message with schedules details

Parameters:



77
78
79
80
81
82
83
84
85
# File 'lib/karafka/pro/scheduled_messages/dispatcher.rb', line 77

def state(tracker)
  config.producer.produce_async(
    topic: "#{@topic}#{config.states_postfix}",
    payload: @serializer.state(tracker),
    key: 'state',
    partition: @partition,
    headers: { 'zlib' => 'true' }
  )
end