Module: Karafka::Pro::ScheduledMessages::Proxy
- Defined in:
- lib/karafka/pro/scheduled_messages/proxy.rb
Overview
Proxy used to wrap the scheduled messages with the correct dispatch envelope. Each message that goes to the scheduler topic needs to have specific headers and other details that are required by the system so we know how and when to dispatch it.
Each message that goes to the proxy topic needs to have a unique key. We inject those automatically unless user provides one in an envelope. Since we want to make sure, that the messages dispatched by the user all go to the same partition (if with same key), we inject a partition_key based on the user key or other details if present. That allows us to make sure, that they will always go to the same partition on our side.
This wrapper validates the initial message that user wants to send in the future, as well as the envelope and specific requirements for a message to be send in the future
Class Method Summary collapse
-
.cancel(key:, envelope: {}) ⇒ Hash
Generates a tombstone message to cancel already scheduled message dispatch.
-
.schedule(message:, epoch:, envelope: {}) ⇒ Hash
Generates a schedule message envelope wrapping the original dispatch.
-
.tombstone(message:) ⇒ Object
Builds tombstone with the dispatched message details.
Class Method Details
.cancel(key:, envelope: {}) ⇒ Hash
Technically it is a tombstone but we differentiate just for the sake of ability to debug stuff if needed
Generates a tombstone message to cancel already scheduled message dispatch
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 103 def cancel(key:, envelope: {}) = { key: key, payload: nil, headers: { 'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION, 'schedule_source_type' => 'cancel' } }.merge(envelope) # Ensure user provided envelope is with all expected details validate!() end |
.schedule(message:, epoch:, envelope: {}) ⇒ Hash
This proxy does not inject the dispatched messages topic unless provided in the envelope. That’s because user can have multiple scheduled messages topics to group outgoing messages, etc.
Generates a schedule message envelope wrapping the original dispatch
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 67 def schedule(message:, epoch:, envelope: {}) # We need to ensure that the message we want to proxy is fully legit. Otherwise, since # we envelope details like target topic, we could end up having incorrect data to # schedule MSG_CONTRACT.validate!(, WaterDrop::Errors::MessageInvalidError) headers = ([:headers] || {}).merge( 'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION, 'schedule_target_epoch' => epoch.to_i.to_s, 'schedule_source_type' => 'schedule' ) export(headers, , :topic) export(headers, , :partition) export(headers, , :key) export(headers, , :partition_key) = { payload: [:payload], headers: headers }.merge(envelope) enrich(, ) validate!() end |
.tombstone(message:) ⇒ Object
Builds tombstone with the dispatched message details. Those details can be used
in Web UI, etc when analyzing dispatches.
123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/karafka/pro/scheduled_messages/proxy.rb', line 123 def tombstone(message:) { key: .key, payload: nil, topic: .topic, partition: .partition, headers: .raw_headers.merge( 'schedule_schema_version' => ScheduledMessages::SCHEMA_VERSION, 'schedule_source_type' => 'tombstone', 'schedule_source_offset' => .offset.to_s ) } end |