Class: Euston::EventStore::Persistence::Mongodb::MongoPersistenceEngine
- Inherits:
-
Object
- Object
- Euston::EventStore::Persistence::Mongodb::MongoPersistenceEngine
- Includes:
- MongoConcurrencyDetection
- Defined in:
- lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb
Instance Method Summary collapse
- #add_snapshot(snapshot) ⇒ Object
- #commit(attempt) ⇒ Object
- #get_from(options) ⇒ Object
- #get_snapshot(stream_id, max_revision) ⇒ Object
- #get_streams_to_snapshot(max_threshold) ⇒ Object
- #get_undispatched_commits(component_id = nil) ⇒ Object
- #init ⇒ Object
-
#initialize(store) ⇒ MongoPersistenceEngine
constructor
A new instance of MongoPersistenceEngine.
- #mark_commit_as_dispatched(commit) ⇒ Object
- #mark_commits_as_dispatched(commits) ⇒ Object
- #take_ownership_of_undispatched_commits(component_id) ⇒ Object
Constructor Details
#initialize(store) ⇒ MongoPersistenceEngine
Returns a new instance of MongoPersistenceEngine.
8 9 10 11 12 13 14 15 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 8 def initialize(store) @store = store collection_names = store.collection_names store.create_collection 'commits' unless collection_names.include? 'commits' store.create_collection 'snapshots' unless collection_names.include? 'snapshots' store.create_collection 'streams' unless collection_names.include? 'streams' end |
Instance Method Details
#add_snapshot(snapshot) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 17 def add_snapshot(snapshot) return false if snapshot.nil? begin mongo_snapshot = snapshot.is_a?(Hash) ? snapshot : snapshot.to_hash id = { '_id' => mongo_snapshot[:_id] } doc = { 'headers' => mongo_snapshot[:headers], 'payload' => mongo_snapshot[:payload] }.merge(id) persisted_snapshots.update id, doc, :upsert => true id = { '_id' => snapshot.stream_id } stream_head = MongoStreamHead.from_hash persisted_stream_heads.find_one(id) modifiers = { '$set' => { 'snapshot_revision' => snapshot.stream_revision, 'unsnapshotted' => stream_head.head_revision - snapshot.stream_revision } } persisted_stream_heads.update id, modifiers return true rescue Mongo::OperationFailure return false end end |
#commit(attempt) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 43 def commit(attempt) try_mongo do commit = attempt.to_mongo_commit on_e11000_error = ->(e) do committed = persisted_commits.find_one(attempt.to_id_query) raise Euston::EventStore::DuplicateCommitError if !committed.nil? && committed['commit_id'] == attempt.commit_id raise Euston::EventStore::ConcurrencyError end on_other_error = ->(e) { raise EventStore::StorageError, e., e.backtrace } detect_mongo_concurrency :on_e11000_error => on_e11000_error, :on_other_error => on_other_error do persisted_commits.insert commit update_stream_head_async attempt.stream_id, attempt.stream_revision, attempt.events.length end end end |
#get_from(options) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 62 def get_from() try_mongo do if .has_key? :timestamp query = { 'commit_timestamp' => { '$gte' => [:timestamp].to_f } } order = [ 'commit_timestamp', Mongo::ASCENDING ] else query = { '_id.stream_id' => [:stream_id], 'events.stream_revision' => { '$gte' => [:min_revision], '$lte' => [:max_revision] } } order = [ 'events.stream_revision', Mongo::ASCENDING ] end persisted_commits.find(query, :sort => order).to_a.map { |hash| MongoCommit.from_hash hash } end end |
#get_snapshot(stream_id, max_revision) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 78 def get_snapshot(stream_id, max_revision) try_mongo do query = { '_id' => { '$gt' => { 'stream_id' => stream_id, 'stream_revision' => nil }, '$lte' => { 'stream_id' => stream_id, 'stream_revision' => max_revision } } } order = [ '_id', Mongo::DESCENDING ] persisted_snapshots.find(query, :sort => order, :limit => 1).map { |hash| MongoSnapshot::from_hash hash }.first end end |
#get_streams_to_snapshot(max_threshold) ⇒ Object
88 89 90 91 92 93 94 95 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 88 def get_streams_to_snapshot(max_threshold) try_mongo do query = { 'unsnapshotted' => { '$gte' => max_threshold } } order = [ 'unsnapshotted', Mongo::DESCENDING ] persisted_stream_heads.find(query, :sort => order).map { |hash| MongoStreamHead.from_hash hash } end end |
#get_undispatched_commits(component_id = nil) ⇒ Object
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 97 def get_undispatched_commits component_id = nil try_mongo do query = { 'dispatched' => false } query['component_id'] = component_id unless component_id.nil? order = [ 'commit_timestamp', Mongo::ASCENDING ] persisted_commits.find(query, :sort => order, :batch_size => 100).to_a.map { |hash| MongoCommit.from_hash hash } end end |
#init ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 108 def init try_mongo do persisted_commits.ensure_index [ ['dispatched', Mongo::ASCENDING], ['component_id', Mongo::ASCENDING], ['commit_timestamp', Mongo::ASCENDING] ], :unique => false, :name => 'dispatched_index' persisted_commits.ensure_index [ ['_id.stream_id', Mongo::ASCENDING], ['events.stream_revision', Mongo::ASCENDING] ], :unique => true, :name => 'get_from_index' persisted_commits.ensure_index [ ['commit_timestamp', Mongo::ASCENDING] ], :unique => false, :name => 'commit_timestamp_index' persisted_stream_heads.ensure_index [ ['unsnapshotted', Mongo::ASCENDING] ], :unique => false, :name => 'unsnapshotted_index' end self end |
#mark_commit_as_dispatched(commit) ⇒ Object
124 125 126 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 124 def mark_commit_as_dispatched(commit) mark_commits_as_dispatched [commit] end |
#mark_commits_as_dispatched(commits) ⇒ Object
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 128 def mark_commits_as_dispatched(commits) return if commits.empty? try_mongo do id_queries = commits.map { |c| c.to_id_query } query = { '$or' => id_queries } persisted_commits.update query, { '$set' => { 'dispatched' => true }, '$unset' => { 'component_id' => 1 } }, :multi => true end end |
#take_ownership_of_undispatched_commits(component_id) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/euston-eventstore/persistence/mongodb/mongo_persistence_engine.rb', line 139 def take_ownership_of_undispatched_commits component_id try_mongo do new_commits_eligible_for_dispatch = { 'component_id' => nil, 'dispatched' => false } commits_stuck_in_other_components = { 'component_id' => { '$ne' => nil } , 'dispatched' => false, 'commit_timestamp' => Time.now.to_f - 60 } query = { '$or' => [ new_commits_eligible_for_dispatch, commits_stuck_in_other_components ] } doc = { '$set' => { 'component_id' => component_id } } persisted_commits.update query, doc, :multi => true end end |