Class: Karafka::Pro::ScheduledMessages::DailyBuffer
- Inherits:
-
Object
- Object
- Karafka::Pro::ScheduledMessages::DailyBuffer
- Defined in:
- lib/karafka/pro/scheduled_messages/daily_buffer.rb
Overview
Stores schedules for the current day and gives back those that should be dispatched We do not use min-heap implementation and just a regular hash because we want to be able to update the schedules based on the key as well as remove the schedules in case it would be cancelled. While removals could be implemented, updates with different timestamp would be more complex. At the moment a lookup of 8 640 000 messages (100 per second) takes up to 1.5 second, thus it is acceptable. Please ping me if you encounter performance issues with this naive implementation so it can be improved.
Instance Method Summary collapse
-
#<<(message) ⇒ Object
Adds message to the buffer or removes the message from the buffer if it is a tombstone message for a given key.
-
#delete(key) ⇒ Object
Removes given key from the accumulator.
-
#for_dispatch {|epoch| ... } ⇒ Object
Yields messages that should be dispatched (sent) to Kafka.
-
#initialize ⇒ DailyBuffer
constructor
A new instance of DailyBuffer.
-
#size ⇒ Integer
Number of elements to schedule today.
Constructor Details
#initialize ⇒ DailyBuffer
Returns a new instance of DailyBuffer.
25 26 27 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 25 def initialize @accu = {} end |
Instance Method Details
#<<(message) ⇒ Object
Only messages for a given day should be added here.
Adds message to the buffer or removes the message from the buffer if it is a tombstone message for a given key
40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 40 def <<() # Non schedule are only tombstones and cancellations schedule = .headers['schedule_source_type'] == 'schedule' key = .key if schedule epoch = .headers['schedule_target_epoch'] @accu[key] = [epoch, ] else @accu.delete(key) end end |
#delete(key) ⇒ Object
Removes given key from the accumulator
73 74 75 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 73 def delete(key) @accu.delete(key) end |
#for_dispatch {|epoch| ... } ⇒ Object
We yield epoch alongside of the message so we do not have to extract it several times later on. This simplifies the API
Yields messages that should be dispatched (sent) to Kafka
61 62 63 64 65 66 67 68 69 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 61 def for_dispatch dispatch = Time.now.to_i @accu.each_value do |epoch, | next unless epoch <= dispatch yield(epoch, ) end end |
#size ⇒ Integer
Returns number of elements to schedule today.
30 31 32 |
# File 'lib/karafka/pro/scheduled_messages/daily_buffer.rb', line 30 def size @accu.size end |