Class: JetstreamBridge::InboxRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/jetstream_bridge/consumer/inbox/inbox_repository.rb

Overview

AR-facing operations for inbox rows (find/build/persist).

Instance Method Summary collapse

Constructor Details

#initialize(klass) ⇒ InboxRepository

Returns a new instance of InboxRepository.



9
10
11
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 9

def initialize(klass)
  @klass = klass
end

Instance Method Details

#already_processed?(record) ⇒ Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 25

def already_processed?(record)
  record.respond_to?(:processed_at) && record.processed_at
end

#find_or_build(msg) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 13

def find_or_build(msg)
  record = if ModelUtils.has_columns?(@klass, :event_id)
             @klass.find_or_initialize_by(event_id: msg.event_id)
           elsif ModelUtils.has_columns?(@klass, :stream_seq)
             @klass.find_or_initialize_by(stream_seq: msg.seq)
           else
             @klass.new
           end

  lock_record(record)
end

#lock_record(record) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 80

def lock_record(record)
  return record unless record.respond_to?(:persisted?) && record.persisted?
  return record unless record.respond_to?(:lock!)

  record.lock!
  record
rescue ActiveRecord::RecordNotFound
  @klass.new
end

#persist_failure(record, error) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 62

def persist_failure(record, error)
  return unless record

  ActiveRecord::Base.transaction do
    now = Time.now.utc
    attrs = {
      status: 'failed',
      last_error: "#{error.class}: #{error.message}",
      updated_at: record.respond_to?(:updated_at) ? now : nil
    }
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
rescue StandardError => e
  Logging.warn("Failed to persist inbox failure: #{e.class}: #{e.message}",
               tag: 'JetstreamBridge::Consumer')
end

#persist_post(record) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 49

def persist_post(record)
  ActiveRecord::Base.transaction do
    now = Time.now.utc
    attrs = {
      status: 'processed',
      processed_at: record.respond_to?(:processed_at) ? now : nil,
      updated_at: record.respond_to?(:updated_at) ? now : nil
    }
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end

#persist_pre(record, msg) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/jetstream_bridge/consumer/inbox/inbox_repository.rb', line 29

def persist_pre(record, msg)
  ActiveRecord::Base.transaction do
    attrs = {
      event_id: msg.event_id,
      subject: msg.subject,
      payload: ModelUtils.json_dump(msg.body_for_store),
      headers: ModelUtils.json_dump(msg.headers),
      stream: msg.stream,
      stream_seq: msg.seq,
      deliveries: msg.deliveries,
      status: 'processing',
      last_error: nil,
      received_at: record.respond_to?(:received_at) ? (record.received_at || msg.now) : nil,
      updated_at: record.respond_to?(:updated_at) ? msg.now : nil
    }
    ModelUtils.assign_known_attrs(record, attrs)
    record.save!
  end
end