Class: Eventus::Persistence::Sequel
- Inherits:
-
Object
- Object
- Eventus::Persistence::Sequel
- Defined in:
- lib/eventus/persistence/sequel.rb
Constant Summary collapse
- MIGRATIONS_PATH =
File.('../sequel/migrations', __FILE__).freeze
- TABLE_NAME =
:eventus_events
Instance Attribute Summary collapse
-
#dataset ⇒ Object
readonly
Returns the value of attribute dataset.
-
#db ⇒ Object
readonly
Returns the value of attribute db.
Class Method Summary collapse
Instance Method Summary collapse
- #_load(sid = nil, dispatched = nil, min = nil, max = nil) ⇒ Object
- #commit(events) ⇒ Object
- #convert_row(row) ⇒ Object
-
#initialize(db) ⇒ Sequel
constructor
A new instance of Sequel.
- #load(sid, min = nil, max = nil) ⇒ Object
- #load_undispatched(sid = nil) ⇒ Object
- #mark_dispatched(sid, sequence) ⇒ Object
Constructor Details
#initialize(db) ⇒ Sequel
Returns a new instance of Sequel.
9 10 11 12 13 |
# File 'lib/eventus/persistence/sequel.rb', line 9 def initialize(db) @db = db @dataset = db[TABLE_NAME] @dataset.row_proc = method :convert_row end |
Instance Attribute Details
#dataset ⇒ Object (readonly)
Returns the value of attribute dataset.
7 8 9 |
# File 'lib/eventus/persistence/sequel.rb', line 7 def dataset @dataset end |
#db ⇒ Object (readonly)
Returns the value of attribute db.
7 8 9 |
# File 'lib/eventus/persistence/sequel.rb', line 7 def db @db end |
Class Method Details
.migrate!(db, target = nil) ⇒ Object
58 59 60 61 |
# File 'lib/eventus/persistence/sequel.rb', line 58 def self.migrate!(db, target = nil) ::Sequel.extension :migration ::Sequel::Migrator.run(db, MIGRATIONS_PATH, :target => target, :table => :eventus_schema) end |
Instance Method Details
#_load(sid = nil, dispatched = nil, min = nil, max = nil) ⇒ Object
35 36 37 38 39 40 41 42 |
# File 'lib/eventus/persistence/sequel.rb', line 35 def _load(sid = nil, dispatched = nil, min = nil, max = nil) events = @dataset events = events.where(:sid => sid) unless sid.nil? events = events.where(:dispatched => dispatched) unless dispatched.nil? events = events.where{ sequence >= min } unless min.nil? events = events.where{ sequence <= max } unless max.nil? events.order_by(:sequence).all end |
#commit(events) ⇒ Object
15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/eventus/persistence/sequel.rb', line 15 def commit(events) @db.transaction(:savepoint => true) do events.each do |event| event['body'] = db.typecast_value(:json, event['body'] || {}) end @dataset.multi_insert(events) end events rescue ::Sequel::UniqueConstraintViolation raise Eventus::ConcurrencyError end |
#convert_row(row) ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/eventus/persistence/sequel.rb', line 50 def convert_row(row) res = row.each_with_object({}) do |(k,v), memo| memo[k.to_s] = v end res['body'] = res.fetch('body', {}).to_hash res end |
#load(sid, min = nil, max = nil) ⇒ Object
27 28 29 |
# File 'lib/eventus/persistence/sequel.rb', line 27 def load(sid, min = nil, max = nil) _load(sid, nil, min, max) end |
#load_undispatched(sid = nil) ⇒ Object
31 32 33 |
# File 'lib/eventus/persistence/sequel.rb', line 31 def load_undispatched(sid = nil) _load(sid, false) end |
#mark_dispatched(sid, sequence) ⇒ Object
44 45 46 47 48 |
# File 'lib/eventus/persistence/sequel.rb', line 44 def mark_dispatched(sid, sequence) @dataset .where(:sid => sid, :sequence => sequence) .update(:dispatched => true) end |