Class: StrokeDB::Store
- Includes:
- Enumerable, ChainSync
- Defined in:
- lib/strokedb/store.rb,
lib/strokedb/sync/store_sync.rb,
lib/strokedb/stores/remote_store.rb
Instance Attribute Summary collapse
-
#index_store ⇒ Object
Returns the value of attribute index_store.
-
#storage ⇒ Object
Returns the value of attribute storage.
-
#timestamp ⇒ Object
readonly
Returns the value of attribute timestamp.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
Instance Method Summary collapse
- #autosync! ⇒ Object
- #document ⇒ Object
- #each(options = {}, &block) ⇒ Object
- #find(uuid, version = nil, opts = {}, &block) ⇒ Object
- #head_version(uuid) ⇒ Object
- #include?(uuid, version = nil) ⇒ Boolean (also: #contains?)
-
#initialize(options = {}) ⇒ Store
constructor
A new instance of Store.
- #inspect ⇒ Object
- #next_timestamp! ⇒ Object
- #path ⇒ Object
- #remote_server(addr, protocol = :drb) ⇒ Object
- #save!(doc) ⇒ Object
- #save_as_head!(doc) ⇒ Object
- #search(*args) ⇒ Object
- #stop_autosync! ⇒ Object
- #sync!(docs, _timestamp = nil) ⇒ Object
Methods included from ChainSync
Methods included from Enumerable
#each_consecutive_pair, #group_by, #map_with_index
Constructor Details
#initialize(options = {}) ⇒ Store
Returns a new instance of Store.
10 11 12 13 14 15 16 17 |
# File 'lib/strokedb/store.rb', line 10 def initialize( = {}) @options = .stringify_keys @storage = @options['storage'] @index_store = @options['index'] initialize_files autosync! unless @options['noautosync'] raise "Missing chunk storage" unless @storage end |
Instance Attribute Details
#index_store ⇒ Object
Returns the value of attribute index_store.
7 8 9 |
# File 'lib/strokedb/store.rb', line 7 def index_store @index_store end |
#storage ⇒ Object
Returns the value of attribute storage.
7 8 9 |
# File 'lib/strokedb/store.rb', line 7 def storage @storage end |
#timestamp ⇒ Object (readonly)
Returns the value of attribute timestamp.
8 9 10 |
# File 'lib/strokedb/store.rb', line 8 def @timestamp end |
#uuid ⇒ Object
Returns the value of attribute uuid.
7 8 9 |
# File 'lib/strokedb/store.rb', line 7 def uuid @uuid end |
Instance Method Details
#autosync! ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/strokedb/store.rb', line 83 def autosync! @autosync_mutex ||= Mutex.new @autosync = nil if @autosync && !@autosync.status at_exit { stop_autosync! } @autosync ||= Thread.new do until @stop_autosync @autosync_mutex.synchronize { storage.sync_chained_storages! } sleep(1) end end end |
#document ⇒ Object
75 76 77 |
# File 'lib/strokedb/store.rb', line 75 def document find(uuid) || StoreInfo.create!(self, :uuid => uuid) end |
#each(options = {}, &block) ⇒ Object
59 60 61 |
# File 'lib/strokedb/store.rb', line 59 def each( = {},&block) @storage.each(.merge(:store => self),&block) end |
#find(uuid, version = nil, opts = {}, &block) ⇒ Object
19 20 21 |
# File 'lib/strokedb/store.rb', line 19 def find(uuid, version=nil, opts = {}, &block) @storage.find(uuid,version,opts.merge(:store => self),&block) end |
#head_version(uuid) ⇒ Object
33 34 35 |
# File 'lib/strokedb/store.rb', line 33 def head_version(uuid) @storage.head_version(uuid,{ :store => self }) end |
#include?(uuid, version = nil) ⇒ Boolean Also known as: contains?
28 29 30 |
# File 'lib/strokedb/store.rb', line 28 def include?(uuid,version=nil) @storage.include?(uuid,version) end |
#inspect ⇒ Object
79 80 81 |
# File 'lib/strokedb/store.rb', line 79 def inspect "#<Store #{uuid}>" end |
#next_timestamp! ⇒ Object
63 64 65 66 67 |
# File 'lib/strokedb/store.rb', line 63 def @timestamp = .next @timestamp end |
#path ⇒ Object
101 102 103 |
# File 'lib/strokedb/store.rb', line 101 def path @options['path'] end |
#remote_server(addr, protocol = :drb) ⇒ Object
7 8 9 10 11 12 13 14 |
# File 'lib/strokedb/stores/remote_store.rb', line 7 def remote_server(addr, protocol=:drb) case protocol when :drb RemoteStore::DRb::Server.new(self,"#{addr}") else raise "No #{protocol} protocol" end end |
#save!(doc) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/strokedb/store.rb', line 37 def save!(doc) storage.save!(doc, ) # if @index_store if doc.previous_version raw_pdoc = find(doc.uuid,doc.previous_version,:no_instantiation => true) pdoc = Document.from_raw(self,raw_pdoc.freeze,:skip_callbacks => true) pdoc.extend(VersionedDocument) @index_store.delete(pdoc) end @index_store.insert(doc) @index_store.save! end end |
#save_as_head!(doc) ⇒ Object
53 54 55 |
# File 'lib/strokedb/store.rb', line 53 def save_as_head!(doc) @storage.save_as_head!(doc,) end |
#search(*args) ⇒ Object
23 24 25 26 |
# File 'lib/strokedb/store.rb', line 23 def search(*args) return [] unless @index_store @index_store.find(*args) end |
#stop_autosync! ⇒ Object
95 96 97 98 99 |
# File 'lib/strokedb/store.rb', line 95 def stop_autosync! if @autosync_mutex @autosync_mutex.synchronize { @stop_autosync = true; storage.sync_chained_storages! } end end |
#sync!(docs, _timestamp = nil) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/strokedb/sync/store_sync.rb', line 19 def sync!(docs, =nil) = .counter report = SynchronizationReport.new(self, :store_document => document, :timestamp => ) existing_chain = {} docs.group_by {|doc| doc.uuid}.each_pair do |uuid, versions| doc = find(uuid) existing_chain[uuid] = doc.versions.all_versions.map {|v| [v, doc.versions[v].to_json ]} if doc end case when Numeric @timestamp = LTS.new(, .uuid) when LamportTimestamp @timestamp = LTS.new(.counter, .uuid) else end @txn = Transaction.new(:store => self) @txn.execute do |txn| docs.each {|doc| save!(doc) } docs.group_by {|doc| doc.uuid}.each_pair do |uuid, versions| incoming_chain = find(uuid, versions.last.version).versions.all_versions.map {|v| [v, find(uuid,v).to_json ]} if existing_chain[uuid].nil? or existing_chain[uuid].empty? # It is a new document added_doc = find(uuid, versions.last.version) save_as_head!(added_doc) report.added_documents << added_doc else begin sync = sync_chains(incoming_chain.reverse, existing_chain[uuid].reverse) rescue NonMatchingChains # raise NonMatchingDocumentCondition.new(uuid) # that will definitely leave garbage in the store (FIXME?) txn.rollback! non_matching_doc = find(uuid) report.non_matching_documents << non_matching_doc next end resolution = sync.is_a?(Array) ? sync.first : sync case resolution when :up_to_date # nothing to do txn.commit! when :merge report.conflicts << SynchronizationConflict.create!(self, :document => find(uuid), :rev1 => sync[1].map{|e| e[0]}.reverse, :rev2 => sync[2].map{|e| e[0]}.reverse) txn.commit! when :fast_forward fast_forwarded_doc = find(uuid, sync[1].last.first) save_as_head!(fast_forwarded_doc) report.fast_forwarded_documents << fast_forwarded_doc txn.commit! else txn.rollback! raise "Invalid sync resolution #{resolution}" end end end end report.conflicts.each do |conflict| if resolution_strategy = conflict.document.[:resolution_strategy] conflict. << resolution_strategy conflict.save! end conflict.resolve! end report.save! end |