Module: Sequent::Migrations::Grouper
- Defined in:
- lib/sequent/migrations/grouper.rb
Constant Summary collapse
- GroupEndpoint =
Data.define(:partition_key, :aggregate_id) do def <=>(other) return unless other.is_a?(self.class) [partition_key, aggregate_id] <=> [other.partition_key, other.aggregate_id] end include Comparable def to_s "(#{partition_key}, #{aggregate_id})" end end
- PartitionData =
Data.define(:key, :original_size, :remaining_size, :lower_bound)
- UUID_COUNT =
2**128
- LOWEST_UUID =
number_to_uuid(0)
- HIGHEST_UUID =
number_to_uuid(UUID_COUNT - 1)
Class Method Summary collapse
-
.group_partitions(partitions, target_group_size) ⇒ Object
Generate approximately equally sized groups based on the events partition keys and the number of events per partition key.
- .number_to_uuid(number) ⇒ Object
- .uuid_to_number(uuid) ⇒ Object
Class Method Details
.group_partitions(partitions, target_group_size) ⇒ Object
Generate approximately equally sized groups based on the events partition keys and the number of events per partition key. Each group is defined by a lower bound (partition-key, aggregate-id) and upper bound (partition-key, aggregate-id) (inclusive).
For splitting a partition into equal sized groups the assumption is made that aggregate-ids and their events are equally distributed.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/sequent/migrations/grouper.rb', line 29 def self.group_partitions(partitions, target_group_size) return [] unless partitions.present? partitions = partitions.sort.map do |key, count| PartitionData.new(key:, original_size: count, remaining_size: count, lower_bound: 0) end partition = partitions.shift current_start = GroupEndpoint.new(partition.key, LOWEST_UUID) current_size = 0 result = [] while partition.present? if current_size + partition.remaining_size < target_group_size current_size += partition.remaining_size if partitions.empty? result << (current_start..GroupEndpoint.new(partition.key, HIGHEST_UUID)) break end partition = partitions.shift elsif current_size + partition.remaining_size == target_group_size result << (current_start..GroupEndpoint.new(partition.key, HIGHEST_UUID)) partition = partitions.shift break unless partition current_start = GroupEndpoint.new(partition.key, LOWEST_UUID) current_size = 0 else taken = target_group_size - current_size upper_bound = partition.lower_bound + (UUID_COUNT * taken / partition.original_size) result << (current_start..GroupEndpoint.new(partition.key, number_to_uuid(upper_bound - 1))) remaining_size = partition.remaining_size - taken partition = partition.with(remaining_size:, lower_bound: upper_bound) current_start = GroupEndpoint.new(partition.key, number_to_uuid(upper_bound)) current_size = 0 end end result end |
.number_to_uuid(number) ⇒ Object
74 75 76 77 78 79 |
# File 'lib/sequent/migrations/grouper.rb', line 74 def self.number_to_uuid(number) fail ArgumentError, number unless (0..UUID_COUNT - 1).include? number s = format('%032x', number) "#{s[0..7]}-#{s[8..11]}-#{s[12..15]}-#{s[16..19]}-#{s[20..]}" end |
.uuid_to_number(uuid) ⇒ Object
81 82 83 |
# File 'lib/sequent/migrations/grouper.rb', line 81 def self.uuid_to_number(uuid) Integer(uuid.gsub('-', ''), 16) end |