Class: Eventus::Persistence::Sequel

Inherits:
Object
  • Object
show all
Defined in:
lib/eventus/persistence/sequel.rb

Constant Summary collapse

MIGRATIONS_PATH =
File.expand_path('../sequel/migrations', __FILE__).freeze
TABLE_NAME =
:eventus_events

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db) ⇒ Sequel

Returns a new instance of Sequel.



9
10
11
12
13
# File 'lib/eventus/persistence/sequel.rb', line 9

def initialize(db)
  @db = db
  @dataset = db[TABLE_NAME]
  @dataset.row_proc = method :convert_row
end

Instance Attribute Details

#datasetObject (readonly)

Returns the value of attribute dataset.



7
8
9
# File 'lib/eventus/persistence/sequel.rb', line 7

def dataset
  @dataset
end

#dbObject (readonly)

Returns the value of attribute db.



7
8
9
# File 'lib/eventus/persistence/sequel.rb', line 7

def db
  @db
end

Class Method Details

.migrate!(db, target = nil) ⇒ Object



58
59
60
61
# File 'lib/eventus/persistence/sequel.rb', line 58

def self.migrate!(db, target = nil)
  ::Sequel.extension :migration
  ::Sequel::Migrator.run(db, MIGRATIONS_PATH, :target => target, :table => :eventus_schema)
end

Instance Method Details

#_load(sid = nil, dispatched = nil, min = nil, max = nil) ⇒ Object



35
36
37
38
39
40
41
42
# File 'lib/eventus/persistence/sequel.rb', line 35

def _load(sid = nil, dispatched = nil, min = nil, max = nil)
  events = @dataset
  events = events.where(:sid => sid) unless sid.nil?
  events = events.where(:dispatched => dispatched) unless dispatched.nil?
  events = events.where{ sequence >= min } unless min.nil?
  events = events.where{ sequence <= max } unless max.nil?
  events.order_by(:sequence).all
end

#commit(events) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/eventus/persistence/sequel.rb', line 15

def commit(events)
  @db.transaction(:savepoint => true) do
    events.each do |event|
      event['body'] = db.typecast_value(:json, event['body'] || {})
    end
    @dataset.multi_insert(events)
  end
  events
rescue ::Sequel::UniqueConstraintViolation
  raise Eventus::ConcurrencyError
end

#convert_row(row) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/eventus/persistence/sequel.rb', line 50

def convert_row(row)
  res = row.each_with_object({}) do |(k,v), memo|
    memo[k.to_s] = v
  end
  res['body'] = res.fetch('body', {}).to_hash
  res
end

#load(sid, min = nil, max = nil) ⇒ Object



27
28
29
# File 'lib/eventus/persistence/sequel.rb', line 27

def load(sid, min = nil, max = nil)
  _load(sid, nil, min, max)
end

#load_undispatched(sid = nil) ⇒ Object



31
32
33
# File 'lib/eventus/persistence/sequel.rb', line 31

def load_undispatched(sid = nil)
  _load(sid, false)
end

#mark_dispatched(sid, sequence) ⇒ Object



44
45
46
47
48
# File 'lib/eventus/persistence/sequel.rb', line 44

def mark_dispatched(sid, sequence)
  @dataset
    .where(:sid => sid, :sequence => sequence)
    .update(:dispatched => true)
end