Class: Eventus::Persistence::KyotoCabinet

Inherits:
Object
  • Object
show all
Defined in:
lib/eventus/persistence/kyotocabinet.rb

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ KyotoCabinet

Returns a new instance of KyotoCabinet.



7
8
9
10
11
12
13
14
15
# File 'lib/eventus/persistence/kyotocabinet.rb', line 7

def initialize(options = {})
  @db = ::KyotoCabinet::DB::new
  @queue = ::KyotoCabinet::DB::new
  @serializer = options.delete(:serializer) || Eventus::Serializers::Marshal
  queue_con = build_connection(:path => options.delete(:queue_path) || '*')
  con = build_connection(options)
  Eventus.logger.info "Opening db: #{con}, queue: #{queue_con}"
  raise Eventus::ConnectionError unless @db.open(con) && @queue.open(queue_con)
end

Instance Method Details

#build_key(id, index) ⇒ Object



66
67
68
# File 'lib/eventus/persistence/kyotocabinet.rb', line 66

def build_key(id, index)
  id + ("_%07d" % index)
end

#closeObject



70
71
72
73
# File 'lib/eventus/persistence/kyotocabinet.rb', line 70

def close
  @db.close
  @queue.close
end

#commit(events) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/eventus/persistence/kyotocabinet.rb', line 17

def commit(events)
  @db.transaction do
    events.each do |event, index|
      pid = pack_hex(event['sid'])
      key = build_key(pid, event['sequence'])
      value = @serializer.serialize(event)
      raise Eventus::ConcurrencyError unless @db.add(key,value)
      @queue.set(key, "")
    end
  end
  events
end

#load(id, min = nil) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/eventus/persistence/kyotocabinet.rb', line 30

def load(id, min = nil)
  Eventus.logger.debug "Loading stream: #{id}"
  pid = pack_hex(id)
  keys = @db.match_prefix(pid)

  if min
    min_key = build_key(pid, min)
    keys = keys.drop_while { |k| k != min_key }
  end

  @db.get_bulk(keys, false).values.map { |obj| @serializer.deserialize(obj) }
end

#load_undispatchedObject



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/eventus/persistence/kyotocabinet.rb', line 43

def load_undispatched
  events = []
  @queue.each_key do |key|
    value = @db.get(key[0])
    next unless value
    obj = @serializer.deserialize(value)
    events << obj
  end
  Eventus.logger.info "#{events.length} undispatched events loaded"
  events
end

#mark_dispatched(stream_id, sequence) ⇒ Object



55
56
57
58
59
# File 'lib/eventus/persistence/kyotocabinet.rb', line 55

def mark_dispatched(stream_id, sequence)
  Eventus.logger.debug "Marking #{stream_id}_#{sequence} dispatched"
  key = build_key(pack_hex(stream_id), sequence)
  @queue.remove(key)
end

#pack_hex(id) ⇒ Object



61
62
63
64
# File 'lib/eventus/persistence/kyotocabinet.rb', line 61

def pack_hex(id)
  id
  #id.match(/^[0-9a-fA-F]+$/) ? [id].pack('H*') : id
end