Class: Eventus::Persistence::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/eventus/persistence/redis.rb

Constant Summary collapse

COMMIT_LUA =
<<-LUA
local streamId = KEYS[1]
local version = tonumber(ARGV[1])

local actualVersion = tonumber(redis.call('zcount', streamId, '-inf', '+inf')) + 1
if actualVersion ~= version then
  return redis.error_reply('conflict')
end

for i=2,#ARGV do
  redis.call('zadd', streamId, version+i-2, ARGV[i])
end

return {'commit', tostring(version)}
LUA

Instance Method Summary collapse

Constructor Details

#initialize(redis) ⇒ Redis

Returns a new instance of Redis.



6
7
8
# File 'lib/eventus/persistence/redis.rb', line 6

def initialize(redis)
  @redis = redis
end

Instance Method Details

#commit(events) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/eventus/persistence/redis.rb', line 15

def commit(events)
  streamId = events[0]['sid']
  version = events[0]['sequence']
  json_events = events.map{|e| e.to_json}
  run_commit streamId, version, json_events

rescue ::Redis::CommandError
  raise Eventus::ConcurrencyError
end

#load(id, min = 1) ⇒ Object



10
11
12
13
# File 'lib/eventus/persistence/redis.rb', line 10

def load(id, min=1)
  raw_events = @redis.zrange id, min-1, -1
  raw_events.map { |e| JSON.parse(e) }
end

#run_commit(streamId, version, events) ⇒ Object



25
26
27
28
# File 'lib/eventus/persistence/redis.rb', line 25

def run_commit(streamId, version, events)
  @sha ||= @redis.script :load, COMMIT_LUA
  @redis.evalsha(@sha, [streamId], [version] + events)
end