Class: JetstreamBridge::OutboxRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/publisher/outbox_repository.rb

Overview

Encapsulates AR-backed outbox persistence operations.

Instance Method Summary collapse

Constructor Details

#initialize(klass) ⇒ OutboxRepository

Returns a new instance of OutboxRepository.



9
10
11
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 9

def initialize(klass)
  @klass = klass
end

Instance Method Details

#already_sent?(record) ⇒ Boolean

Returns:

  • (Boolean)


33
34
35
36
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 33

def already_sent?(record)
  (record.respond_to?(:sent_at) && record.sent_at) ||
    (record.respond_to?(:status) && record.status == 'sent')
end

#find_or_build(event_id) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 13

def find_or_build(event_id)
  record = ModelUtils.find_or_init_by_best(
    @klass,
    { event_id: event_id },
    { dedup_key: event_id } # fallback if app uses a different unique column
  )

  # Lock the row to prevent concurrent processing
  if record.persisted? && !record.new_record? && record.respond_to?(:lock!)
    begin
      record.lock!
    rescue ActiveRecord::RecordNotFound
      # Record was deleted between find and lock, create new
      record = @klass.new
    end
  end

  record
end

#persist_exception(record, error) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 81

def persist_exception(record, error)
  return unless record

  persist_failure(record, "#{error.class}: #{error.message}")
rescue StandardError => e
  Logging.warn("Failed to persist outbox failure: #{e.class}: #{e.message}",
               tag: 'JetstreamBridge::Publisher')
end

#persist_failure(record, message) ⇒ Object



71
72
73
74
75
76
77
78
79
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 71

def persist_failure(record, message)
  ActiveRecord::Base.transaction do
    now = Time.now.utc
    attrs = { status: 'failed', last_error: message }
    attrs[:updated_at] = now if record.respond_to?(:updated_at)
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end

#persist_pre(record, subject, envelope) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 38

def persist_pre(record, subject, envelope)
  ActiveRecord::Base.transaction do
    now      = Time.now.utc
    event_id = envelope['event_id'].to_s

    attrs = {
      event_id: event_id,
      subject: subject,
      payload: ModelUtils.json_dump(envelope),
      headers: ModelUtils.json_dump({ 'nats-msg-id' => event_id }),
      status: 'publishing',
      last_error: nil
    }
    attrs[:attempts] = 1 + (record.attempts || 0) if record.respond_to?(:attempts)
    attrs[:enqueued_at] = (record.enqueued_at || now) if record.respond_to?(:enqueued_at)
    attrs[:updated_at] = now if record.respond_to?(:updated_at)

    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end

#persist_success(record) ⇒ Object



60
61
62
63
64
65
66
67
68
69
# File 'lib/jetstream_bridge/publisher/outbox_repository.rb', line 60

def persist_success(record)
  ActiveRecord::Base.transaction do
    now = Time.now.utc
    attrs = { status: 'sent' }
    attrs[:sent_at] = now if record.respond_to?(:sent_at)
    attrs[:updated_at] = now if record.respond_to?(:updated_at)
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end