18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'app/interactors/sbmt/outbox/create_outbox_batch.rb', line 18
def call
middlewares = Middleware::Builder.new(create_batch_middlewares)
middlewares.call(item_class, batch_attributes) do
attributes_to_insert = batch_attributes.map do |attributes|
event_key = attributes[:event_key]
partition_by = attributes.delete(:partition_by) || event_key
return Failure(:missing_partition_by) unless partition_by
return Failure(:missing_event_key) unless event_key
record = item_class.new(attributes)
res = item_class.config.partition_strategy
.new(partition_by, item_class.config.bucket_size)
.call
record.bucket = res.value! if res.success?
record.created_at ||= Time.zone.now
record.updated_at ||= record.created_at
record.attributes.reject { |_, value| value.nil? }
end
inserted_items = item_class.insert_all(attributes_to_insert, returning: [:id, :bucket])
inserted_items.rows.each do |(record_id, bucket)|
partition = item_class.bucket_partitions.fetch(bucket)
track_last_stored_id(record_id, partition)
track_counter(partition)
end
Success(inserted_items.rows.map(&:first))
rescue => e
Failure(e.message)
end
end
|