Class: RubyEventStore::ROM::Relations::StreamEntries
- Inherits:
-
Object
- Object
- RubyEventStore::ROM::Relations::StreamEntries
- Defined in:
- lib/ruby_event_store/rom/relations/stream_entries.rb
Constant Summary collapse
- DIRECTION_MAP =
{ forward: %i[asc > <], backward: %i[desc < >] }.freeze
Instance Method Summary collapse
- #by_event_id(event_id) ⇒ Object
- #by_event_type(types) ⇒ Object
- #by_stream(stream) ⇒ Object
- #by_stream_and_event_id(stream, event_id) ⇒ Object
- #create_changeset(tuples) ⇒ Object
- #join_events ⇒ Object
- #max_position(stream) ⇒ Object
- #newer_than(time, time_sort_by) ⇒ Object
- #newer_than_or_equal(time, time_sort_by) ⇒ Object
- #older_than(time, time_sort_by) ⇒ Object
- #older_than_or_equal(time, time_sort_by) ⇒ Object
- #ordered(direction, stream, offset_entry_id = nil, stop_entry_id = nil, time_sort_by = nil) ⇒ Object
Instance Method Details
#by_event_id(event_id) ⇒ Object
21 22 23 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 21 def by_event_id(event_id) where(event_id: event_id) end |
#by_event_type(types) ⇒ Object
25 26 27 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 25 def by_event_type(types) join_events.where(event_type: types) end |
#by_stream(stream) ⇒ Object
17 18 19 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 17 def by_stream(stream) where(stream: stream.name) end |
#by_stream_and_event_id(stream, event_id) ⇒ Object
29 30 31 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 29 def by_stream_and_event_id(stream, event_id) where(stream: stream.name, event_id: event_id).one! end |
#create_changeset(tuples) ⇒ Object
13 14 15 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 13 def create_changeset(tuples) changeset(ROM::Changesets::CreateStreamEntries, tuples) end |
#join_events ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 97 def join_events if dataset.opts[:join]&.map(&:table)&.include?(events.dataset.first_source_table) self else join(:events) end end |
#max_position(stream) ⇒ Object
33 34 35 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 33 def max_position(stream) by_stream(stream).select(:position).order(Sequel.desc(:position)).first end |
#newer_than(time, time_sort_by) ⇒ Object
37 38 39 40 41 42 43 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 37 def newer_than(time, time_sort_by) if time_sort_by == :as_of join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) > time.localtime } else join_events.where { |r| r.events[:created_at] > time.localtime } end end |
#newer_than_or_equal(time, time_sort_by) ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 45 def newer_than_or_equal(time, time_sort_by) if time_sort_by == :as_of join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) >= time.localtime } else join_events.where { |r| r.events[:created_at] >= time.localtime } end end |
#older_than(time, time_sort_by) ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 53 def older_than(time, time_sort_by) if time_sort_by == :as_of join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) < time.localtime } else join_events.where { |r| r.events[:created_at] < time.localtime } end end |
#older_than_or_equal(time, time_sort_by) ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 61 def older_than_or_equal(time, time_sort_by) if time_sort_by == :as_of join_events.where { |r| string::coalesce(r.events[:valid_at], r.events[:created_at]) <= time.localtime } else join_events.where { |r| r.events[:created_at] <= time.localtime } end end |
#ordered(direction, stream, offset_entry_id = nil, stop_entry_id = nil, time_sort_by = nil) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ruby_event_store/rom/relations/stream_entries.rb', line 71 def ordered(direction, stream, offset_entry_id = nil, stop_entry_id = nil, time_sort_by = nil) order, operator_offset, operator_stop = DIRECTION_MAP[direction] raise ArgumentError, "Direction must be :forward or :backward" if order.nil? event_order_columns = [] stream_order_columns = %i[id] case time_sort_by when :as_at event_order_columns.unshift :created_at when :as_of event_order_columns.unshift :valid_at end query = by_stream(stream) query = query.where { id.public_send(operator_offset, offset_entry_id) } if offset_entry_id query = query.where { id.public_send(operator_stop, stop_entry_id) } if stop_entry_id if event_order_columns.empty? query.order { |r| stream_order_columns.map { |c| r[:stream_entries][c].public_send(order) } } else query.join_events.order { |r| event_order_columns.map { |c| r.events[c].public_send(order) } } end end |