Class: Eventus::Persistence::KyotoCabinet
- Inherits:
-
Object
- Object
- Eventus::Persistence::KyotoCabinet
- Defined in:
- lib/eventus/persistence/kyotocabinet.rb
Instance Method Summary collapse
- #build_key(id, index) ⇒ Object
- #close ⇒ Object
- #commit(events) ⇒ Object
-
#initialize(options = {}) ⇒ KyotoCabinet
constructor
A new instance of KyotoCabinet.
- #load(id, min = nil) ⇒ Object
- #load_undispatched ⇒ Object
- #mark_dispatched(stream_id, sequence) ⇒ Object
- #pack_hex(id) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ KyotoCabinet
Returns a new instance of KyotoCabinet.
7 8 9 10 11 12 13 14 15 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 7 def initialize( = {}) @db = ::KyotoCabinet::DB::new @queue = ::KyotoCabinet::DB::new @serializer = .delete(:serializer) || Eventus::Serializers::Marshal queue_con = build_connection(:path => .delete(:queue_path) || '*') con = build_connection() Eventus.logger.info "Opening db: #{con}, queue: #{queue_con}" raise Eventus::ConnectionError unless @db.open(con) && @queue.open(queue_con) end |
Instance Method Details
#build_key(id, index) ⇒ Object
66 67 68 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 66 def build_key(id, index) id + ("_%07d" % index) end |
#close ⇒ Object
70 71 72 73 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 70 def close @db.close @queue.close end |
#commit(events) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 17 def commit(events) @db.transaction do events.each do |event, index| pid = pack_hex(event['sid']) key = build_key(pid, event['sequence']) value = @serializer.serialize(event) raise Eventus::ConcurrencyError unless @db.add(key,value) @queue.set(key, "") end end events end |
#load(id, min = nil) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 30 def load(id, min = nil) Eventus.logger.debug "Loading stream: #{id}" pid = pack_hex(id) keys = @db.match_prefix(pid) if min min_key = build_key(pid, min) keys = keys.drop_while { |k| k != min_key } end @db.get_bulk(keys, false).values.map { |obj| @serializer.deserialize(obj) } end |
#load_undispatched ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 43 def load_undispatched events = [] @queue.each_key do |key| value = @db.get(key[0]) next unless value obj = @serializer.deserialize(value) events << obj end Eventus.logger.info "#{events.length} undispatched events loaded" events end |
#mark_dispatched(stream_id, sequence) ⇒ Object
55 56 57 58 59 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 55 def mark_dispatched(stream_id, sequence) Eventus.logger.debug "Marking #{stream_id}_#{sequence} dispatched" key = build_key(pack_hex(stream_id), sequence) @queue.remove(key) end |
#pack_hex(id) ⇒ Object
61 62 63 64 |
# File 'lib/eventus/persistence/kyotocabinet.rb', line 61 def pack_hex(id) id #id.match(/^[0-9a-fA-F]+$/) ? [id].pack('H*') : id end |