Class: JetstreamBridge::OutboxRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/publisher/outbox_repository.rb

Overview

Encapsulates AR-backed outbox persistence operations.

Instance Method Summary collapse

Constructor Details

#initialize(klass) ⇒ OutboxRepository



9
10
11
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 9

def initialize(klass)
  @klass = klass
end

Instance Method Details

#already_sent?(record) ⇒ Boolean



33
34
35
36
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 33

def already_sent?(record)
  (record.respond_to?(:sent_at) && record.sent_at) ||
    (record.respond_to?(:status) && record.status == 'sent')
end

#find_or_build(event_id) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 13

def find_or_build(event_id)
  record = ModelUtils.find_or_init_by_best(
    @klass,
    { event_id: event_id },
    { dedup_key: event_id } # fallback if app uses a different unique column
  )

  # Lock the row to prevent concurrent processing
  if record.persisted? && !record.new_record? && record.respond_to?(:lock!)
    begin
      record.lock!
    rescue ActiveRecord::RecordNotFound
      # Record was deleted between find and lock, create new
      record = @klass.new
    end
  end

  record
end

#record_publish_attempt(record, subject, envelope) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 38

def record_publish_attempt(record, subject, envelope)
  ActiveRecord::Base.transaction do
    attrs = build_publish_attrs(record, subject, envelope)
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end

#record_publish_exception(record, error) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 67

def record_publish_exception(record, error)
  return unless record

  record_publish_failure(record, "#{error.class}: #{error.message}")
rescue StandardError => e
  Logging.warn("Failed to persist outbox failure: #{e.class}: #{e.message}",
               tag: 'JetstreamBridge::Publisher')
end

#record_publish_failure(record, message) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 57

def record_publish_failure(record, message)
  ActiveRecord::Base.transaction do
    now = Time.now.utc
    attrs = { status: 'failed', last_error: message }
    attrs[:updated_at] = now if record.respond_to?(:updated_at)
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end

#record_publish_success(record) ⇒ Object



46
47
48
49
50
51
52
53
54
55
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 46

def record_publish_success(record)
  ActiveRecord::Base.transaction do
    now = Time.now.utc
    attrs = { status: 'sent' }
    attrs[:sent_at] = now if record.respond_to?(:sent_at)
    attrs[:updated_at] = now if record.respond_to?(:updated_at)
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end