Class: RubyEventStore::Outbox::Repository
- Inherits:
-
Object
- Object
- RubyEventStore::Outbox::Repository
- Defined in:
- lib/ruby_event_store/outbox/repository.rb
Defined Under Namespace
Constant Summary collapse
- RECENTLY_LOCKED_DURATION =
10.minutes
Instance Method Summary collapse
- #delete_enqueued_older_than(fetch_specification, duration, limit) ⇒ Object
- #get_remaining_count(fetch_specification) ⇒ Object
-
#initialize(database_url) ⇒ Repository
constructor
A new instance of Repository.
- #mark_as_enqueued(record, now) ⇒ Object
- #obtain_lock_for_process(fetch_specification, process_uuid, clock:) ⇒ Object
- #release_lock_for_process(fetch_specification, process_uuid) ⇒ Object
- #retrieve_batch(fetch_specification, batch_size) ⇒ Object
Constructor Details
#initialize(database_url) ⇒ Repository
Returns a new instance of Repository.
116 117 118 119 120 121 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 116 def initialize(database_url) ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected? if ::ActiveRecord::Base.connection.adapter_name == "Mysql2" ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") end end |
Instance Method Details
#delete_enqueued_older_than(fetch_specification, duration, limit) ⇒ Object
143 144 145 146 147 148 149 150 151 152 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 143 def delete_enqueued_older_than(fetch_specification, duration, limit) scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago) scope = scope.limit(limit).order(:id) unless limit == :all scope.delete_all :ok rescue ::ActiveRecord::Deadlocked :deadlocked rescue ::ActiveRecord::LockWaitTimeout :lock_timeout end |
#get_remaining_count(fetch_specification) ⇒ Object
127 128 129 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 127 def get_remaining_count(fetch_specification) Record.remaining_for(fetch_specification).count end |
#mark_as_enqueued(record, now) ⇒ Object
139 140 141 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 139 def mark_as_enqueued(record, now) record.update_column(:enqueued_at, now) end |
#obtain_lock_for_process(fetch_specification, process_uuid, clock:) ⇒ Object
131 132 133 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 131 def obtain_lock_for_process(fetch_specification, process_uuid, clock:) Lock.obtain(fetch_specification, process_uuid, clock: clock) end |
#release_lock_for_process(fetch_specification, process_uuid) ⇒ Object
135 136 137 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 135 def release_lock_for_process(fetch_specification, process_uuid) Lock.release(fetch_specification, process_uuid) end |
#retrieve_batch(fetch_specification, batch_size) ⇒ Object
123 124 125 |
# File 'lib/ruby_event_store/outbox/repository.rb', line 123 def retrieve_batch(fetch_specification, batch_size) Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a end |