Class: RubyEventStore::Outbox::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/outbox/repository.rb

Defined Under Namespace

Classes: Lock, Record

Constant Summary collapse

RECENTLY_LOCKED_DURATION =
10.minutes

Instance Method Summary collapse

Constructor Details

#initialize(database_url) ⇒ Repository

Returns a new instance of Repository.



116
117
118
119
120
121
# File 'lib/ruby_event_store/outbox/repository.rb', line 116

def initialize(database_url)
  ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected?
  if ::ActiveRecord::Base.connection.adapter_name == "Mysql2"
    ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
  end
end

Instance Method Details

#delete_enqueued_older_than(fetch_specification, duration, limit) ⇒ Object



143
144
145
146
147
148
149
150
151
152
# File 'lib/ruby_event_store/outbox/repository.rb', line 143

def delete_enqueued_older_than(fetch_specification, duration, limit)
  scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago)
  scope = scope.limit(limit).order(:id) unless limit == :all
  scope.delete_all
  :ok
rescue ::ActiveRecord::Deadlocked
  :deadlocked
rescue ::ActiveRecord::LockWaitTimeout
  :lock_timeout
end

#get_remaining_count(fetch_specification) ⇒ Object



127
128
129
# File 'lib/ruby_event_store/outbox/repository.rb', line 127

def get_remaining_count(fetch_specification)
  Record.remaining_for(fetch_specification).count
end

#mark_as_enqueued(record, now) ⇒ Object



139
140
141
# File 'lib/ruby_event_store/outbox/repository.rb', line 139

def mark_as_enqueued(record, now)
  record.update_column(:enqueued_at, now)
end

#obtain_lock_for_process(fetch_specification, process_uuid, clock:) ⇒ Object



131
132
133
# File 'lib/ruby_event_store/outbox/repository.rb', line 131

def obtain_lock_for_process(fetch_specification, process_uuid, clock:)
  Lock.obtain(fetch_specification, process_uuid, clock: clock)
end

#release_lock_for_process(fetch_specification, process_uuid) ⇒ Object



135
136
137
# File 'lib/ruby_event_store/outbox/repository.rb', line 135

def release_lock_for_process(fetch_specification, process_uuid)
  Lock.release(fetch_specification, process_uuid)
end

#retrieve_batch(fetch_specification, batch_size) ⇒ Object



123
124
125
# File 'lib/ruby_event_store/outbox/repository.rb', line 123

def retrieve_batch(fetch_specification, batch_size)
  Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a
end