Module: Karafka::Pro::ScheduledMessages::Proxy

Defined in:
lib/karafka/pro/scheduled_messages/proxy.rb

Overview

Proxy used to wrap the scheduled messages with the correct dispatch envelope. Each message that goes to the scheduler topic needs to have specific headers and other details that are required by the system so we know how and when to dispatch it.

Each message that goes to the proxy topic needs to have a unique key. We inject those automatically unless user provides one in an envelope. Since we want to make sure, that the messages dispatched by the user all go to the same partition (if with same key), we inject a partition_key based on the user key or other details if present. That allows us to make sure, that they will always go to the same partition on our side.

This wrapper validates the initial message that user wants to send in the future, as well as the envelope and specific requirements for a message to be send in the future

Class Method Summary collapse

Class Method Details

.cancel(key:, envelope: {}) ⇒ Hash

Note:

Technically it is a tombstone but we differentiate just for the sake of ability to debug stuff if needed

Generates a tombstone message to cancel already scheduled message dispatch

Parameters:

  • key (String)

    key used by the original message as a unique identifier

  • envelope (Hash) (defaults to: {})

    Special details that can identify the message location like topic and partition (if used) so the cancellation goes to the correct location.

Returns:

  • (Hash)

    cancellation message



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 103

def cancel(key:, envelope: {})
  proxy_message = {
    key: key,
    payload: nil,
    headers: {
      'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION,
      'schedule_source_type' => 'cancel'
    }
  }.merge(envelope)

  # Ensure user provided envelope is with all expected details
  validate!(proxy_message)

  proxy_message
end

.schedule(message:, epoch:, envelope: {}) ⇒ Hash

Note:

This proxy does not inject the dispatched messages topic unless provided in the envelope. That’s because user can have multiple scheduled messages topics to group outgoing messages, etc.

Generates a schedule message envelope wrapping the original dispatch

Parameters:

  • message (Hash)

    message hash of a message that would originally go to WaterDrop producer directly.

  • epoch (Integer)

    time in the future (or now) when dispatch this message in the Unix epoch timestamp

  • envelope (Hash) (defaults to: {})

    Special details that the envelop needs to have, like a unique key. If unique key is not provided we build a random unique one and use a partition_key based on the original message key (if present) to ensure that all relevant messages are dispatched to the same topic partition.

Returns:

  • (Hash)

    dispatched message wrapped with an envelope



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 67

def schedule(message:, epoch:, envelope: {})
  # We need to ensure that the message we want to proxy is fully legit. Otherwise, since
  # we envelope details like target topic, we could end up having incorrect data to
  # schedule
  MSG_CONTRACT.validate!(message, WaterDrop::Errors::MessageInvalidError)

  headers = (message[:headers] || {}).merge(
    'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION,
    'schedule_target_epoch' => epoch.to_i.to_s,
    'schedule_source_type' => 'schedule'
  )

  export(headers, message, :topic)
  export(headers, message, :partition)
  export(headers, message, :key)
  export(headers, message, :partition_key)

  proxy_message = {
    payload: message[:payload],
    headers: headers
  }.merge(envelope)

  enrich(proxy_message, message)
  validate!(proxy_message)

  proxy_message
end

.tombstone(message:) ⇒ Object

Builds tombstone with the dispatched message details. Those details can be used

in Web UI, etc when analyzing dispatches.

Parameters:

  • message (Karafka::Messages::Message)

    message we want to tombstone topic and partition (if used) so the cancellation goes to the correct location.



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 123

def tombstone(message:)
  {
    key: message.key,
    payload: nil,
    topic: message.topic,
    partition: message.partition,
    headers: message.raw_headers.merge(
      'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION,
      'schedule_source_type' => 'tombstone',
      'schedule_source_offset' => message.offset.to_s
    )
  }
end