Class: Journaled::Writer
- Inherits:
-
Object
- Object
- Journaled::Writer
- Defined in:
- app/models/journaled/writer.rb
Constant Summary collapse
- EVENT_METHOD_NAMES =
%i( journaled_schema_name journaled_partition_key journaled_attributes journaled_stream_name journaled_enqueue_opts ).freeze
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(journaled_event:) ⇒ Writer
constructor
A new instance of Writer.
- #journal! ⇒ Object
Constructor Details
#initialize(journaled_event:) ⇒ Writer
Returns a new instance of Writer.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'app/models/journaled/writer.rb', line 12 def initialize(journaled_event:) raise "An enqueued event must respond to: #{EVENT_METHOD_NAMES.to_sentence}" unless respond_to_all?(journaled_event, EVENT_METHOD_NAMES) unless journaled_event.journaled_schema_name.present? && journaled_event.journaled_partition_key.present? && journaled_event.journaled_attributes.present? raise <<~ERROR An enqueued event must have a non-nil response to: #json_schema_name, #partition_key, and #journaled_attributes ERROR end @journaled_event = journaled_event end |
Class Method Details
.delivery_perform_args(events) ⇒ Object
52 53 54 55 56 57 58 59 60 |
# File 'app/models/journaled/writer.rb', line 52 def self.delivery_perform_args(events) events.map do |event| { serialized_event: event.journaled_attributes.to_json, partition_key: event.journaled_partition_key, stream_name: event.journaled_stream_name, } end end |
.enqueue!(*events) ⇒ Object
41 42 43 44 45 46 47 48 49 50 |
# File 'app/models/journaled/writer.rb', line 41 def self.enqueue!(*events) events.group_by(&:journaled_enqueue_opts).each do |enqueue_opts, batch| job_opts = enqueue_opts.reverse_merge(priority: Journaled.job_priority) ActiveSupport::Notifications.instrument('journaled.batch.enqueue', batch: batch, **job_opts) do Journaled::DeliveryJob.set(job_opts).perform_later(*delivery_perform_args(batch)) batch.each { |event| ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: event, **job_opts) } end end end |
Instance Method Details
#journal! ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 |
# File 'app/models/journaled/writer.rb', line 29 def journal! validate! ActiveSupport::Notifications.instrument('journaled.event.stage', event: journaled_event, **journaled_enqueue_opts) do if Journaled::Connection.available? Journaled::Connection.stage!(journaled_event) else self.class.enqueue!(journaled_event) end end end |