Module: Karafka::Pro::ScheduledMessages
- Defined in:
- lib/karafka/pro/scheduled_messages.rb,
lib/karafka/pro/scheduled_messages/day.rb,
lib/karafka/pro/scheduled_messages/proxy.rb,
lib/karafka/pro/scheduled_messages/state.rb,
lib/karafka/pro/scheduled_messages/errors.rb,
lib/karafka/pro/scheduled_messages/tracker.rb,
lib/karafka/pro/scheduled_messages/consumer.rb,
lib/karafka/pro/scheduled_messages/max_epoch.rb,
lib/karafka/pro/scheduled_messages/dispatcher.rb,
lib/karafka/pro/scheduled_messages/serializer.rb,
lib/karafka/pro/scheduled_messages/daily_buffer.rb,
lib/karafka/pro/scheduled_messages/setup/config.rb,
lib/karafka/pro/scheduled_messages/contracts/config.rb,
lib/karafka/pro/scheduled_messages/schema_validator.rb,
lib/karafka/pro/scheduled_messages/contracts/message.rb,
lib/karafka/pro/scheduled_messages/deserializers/headers.rb,
lib/karafka/pro/scheduled_messages/deserializers/payload.rb
Overview
This feature allows for proxying messages via a special topic that can dispatch them at a later time, hence scheduled messages. Such messages need to have a special format but aside from that they are regular Kafka messages.
This work was conceptually inspired by the Go scheduler: github.com/etf1/kafka-message-scheduler though I did not look at the implementation itself. Just the concept of daily in-memory scheduling.
Defined Under Namespace
Modules: Contracts, Deserializers, Errors, Proxy, SchemaValidator, Setup Classes: Consumer, DailyBuffer, Day, Dispatcher, MaxEpoch, Serializer, State, Tracker
Constant Summary collapse
- SCHEMA_VERSION =
Version of the schema we use for envelops in scheduled messages. We use it to detect any potential upgrades similar to other components of Karafka and to stop processing of incompatible versions
'1.0.0'
- STATES_SCHEMA_VERSION =
Version of the states schema. Used to publish per partition simple aggregated metrics that can be used for schedules reporting
'1.0.0'
Class Method Summary collapse
-
.cancel(**kwargs) ⇒ Hash
Generates a tombstone message to cancel given dispatch (if not yet happened).
- .post_setup(config) ⇒ Object
-
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things.
-
.schedule(**kwargs) ⇒ Hash
Runs the ‘Proxy.call`.
Class Method Details
.cancel(**kwargs) ⇒ Hash
Generates a tombstone message to cancel given dispatch (if not yet happened)
44 45 46 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 44 def cancel(**kwargs) Proxy.cancel(**kwargs) end |
.post_setup(config) ⇒ Object
61 62 63 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 61 def post_setup(config) RecurringTasks::Contracts::Config.new.validate!(config.to_h) end |
.pre_setup(config) ⇒ Object
Sets up additional config scope, validations and other things
53 54 55 56 57 58 |
# File 'lib/karafka/pro/scheduled_messages.rb', line 53 def pre_setup(config) # Expand the config with this feature specific stuff config.instance_eval do setting(:scheduled_messages, default: Setup::Config.config) end end |