Class: Journaled::Writer

Inherits:
Object
  • Object
show all
Defined in:
app/models/journaled/writer.rb

Constant Summary collapse

EVENT_METHOD_NAMES =
%i(
  journaled_schema_name
  journaled_partition_key
  journaled_attributes
  journaled_stream_name
  journaled_enqueue_opts
).freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(journaled_event:) ⇒ Writer

Returns a new instance of Writer.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'app/models/journaled/writer.rb', line 12

def initialize(journaled_event:)
  raise "An enqueued event must respond to: #{EVENT_METHOD_NAMES.to_sentence}" unless respond_to_all?(journaled_event, EVENT_METHOD_NAMES)

  unless journaled_event.journaled_schema_name.present? &&
      journaled_event.journaled_partition_key.present? &&
      journaled_event.journaled_attributes.present?
    raise <<~ERROR
      An enqueued event must have a non-nil response to:
        #json_schema_name,
        #partition_key, and
        #journaled_attributes
    ERROR
  end

  @journaled_event = journaled_event
end

Class Method Details

.delivery_perform_args(events) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'app/models/journaled/writer.rb', line 52

def self.delivery_perform_args(events)
  events.map do |event|
    {
      serialized_event: event.journaled_attributes.to_json,
      partition_key: event.journaled_partition_key,
      stream_name: event.journaled_stream_name,
    }
  end
end

.enqueue!(*events) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'app/models/journaled/writer.rb', line 41

def self.enqueue!(*events)
  events.group_by(&:journaled_enqueue_opts).each do |enqueue_opts, batch|
    job_opts = enqueue_opts.reverse_merge(priority: Journaled.job_priority)
    ActiveSupport::Notifications.instrument('journaled.batch.enqueue', batch: batch, **job_opts) do
      Journaled::DeliveryJob.set(job_opts).perform_later(*delivery_perform_args(batch))

      batch.each { |event| ActiveSupport::Notifications.instrument('journaled.event.enqueue', event: event, **job_opts) }
    end
  end
end

Instance Method Details

#journal!Object



29
30
31
32
33
34
35
36
37
38
39
# File 'app/models/journaled/writer.rb', line 29

def journal!
  validate!

  ActiveSupport::Notifications.instrument('journaled.event.stage', event: journaled_event, **journaled_enqueue_opts) do
    if Journaled::Connection.available?
      Journaled::Connection.stage!(journaled_event)
    else
      self.class.enqueue!(journaled_event)
    end
  end
end