Class: Nsync::Consumer
- Inherits:
-
Object
- Object
- Nsync::Consumer
- Defined in:
- lib/nsync/consumer.rb
Overview
The Nsync::Consumer is used to handle the consumption of data from an Nsync repo for the entire app. It reads in the differences between the current version of data in the database and the new data from the producer, finding and notifying all affected classes and objects.
Basic Usage:
Nsync::Config.run do |c|
# The consumer uses a read-only, bare repository (one ending in .git)
# This will automatically be created if it does not exist
c.repo_path = "/local/path/to/hold/data.git"
# The remote repository url from which to pull data
c.repo_url = "git@examplegithost:username/data.git"
# An object that implements the VersionManager interface
# (see Nsync::GitVersionManager) for an example
c.version_manager = MyCustomVersionManager.new
# A lock file path to use for this app
c.lock_file = "/tmp/app_name_nsync.lock"
# The class mapping maps from the class names of the producer classes to
# the class names of their associated consuming classes. A producer can
# map to one or many consumers, and a consumer can be mapped to one or many
# producers. Consumer classes should implement the Consumer interface.
c.map_class "RawDataPostClass", "Post"
c.map_class "RawDataInfo", "Info"
end
# Create a new consumer object, this will clone the repo if needed
consumer = Nsync::Consumer.new
# update this app to the latest data, pulling if necessary
consumer.update
# rollback the last change
consumer.rollback
Direct Known Subclasses
Defined Under Namespace
Classes: Change, CouldNotInitializeRepoError
Instance Attribute Summary collapse
-
#repo ⇒ Object
Returns the value of attribute repo.
Instance Method Summary collapse
-
#after_class_finished(klass, l) ⇒ Object
Adds a callback to the list of callbacks to occur after main processing of the class specified by ‘klass’.
-
#after_current_class_finished(l) ⇒ Object
Adds a callback to the list of callbacks to occur after main processing of the class that is currently being processed.
-
#after_finished(l) ⇒ Object
Adds a callback to the list of callbacks to occur after all changes have been applied.
-
#apply_changes(a, b) ⇒ Object
Translates and applies the changes between commit id ‘a’ and commit id ‘b’ to the datastore.
- #changes(a, b) ⇒ Object
- #config ⇒ Nsync::Config
-
#first_commit ⇒ Object
Gets the first commit id in the repo.
-
#initialize ⇒ Consumer
constructor
Sets the repository to the repo at config.repo_path.
- #latest_changes ⇒ Object
-
#remotes ⇒ Object
Lists the configured data remotes in the repo.
-
#reprocess_class!(klass) ⇒ Object
Reprocesses all changes from the start of the repo to the current version for the class klass, queues will not be cleared, so you can use this to do powerful data reconstruction.
-
#rollback ⇒ Object
Rolls back data to the previous loaded version.
-
#update ⇒ Object
Updates the data to the latest version.
Constructor Details
#initialize ⇒ Consumer
Sets the repository to the repo at config.repo_path
If config.repo_url is set and the directory at config.repo_path does not exist yet, a new bare repository will be cloned from config.repo_url
49 50 51 52 53 |
# File 'lib/nsync/consumer.rb', line 49 def initialize unless get_or_create_repo raise CouldNotInitializeRepoError end end |
Instance Attribute Details
#repo ⇒ Object
Returns the value of attribute repo.
40 41 42 |
# File 'lib/nsync/consumer.rb', line 40 def repo @repo end |
Instance Method Details
#after_class_finished(klass, l) ⇒ Object
Adds a callback to the list of callbacks to occur after main processing of the class specified by ‘klass’. Can be used to handle data relations between objects of the same class.
Example:
class Post
def nsync_update(consumer, event_type, filename, data)
#... normal data update stuff ...
post = self
= data['related_post_ids']
consumer.after_class_finished(Post, lambda {
posts = Post.all(:conditions =>
{:source_id => })
post. = posts
})
end
end
207 208 209 210 211 |
# File 'lib/nsync/consumer.rb', line 207 def after_class_finished(klass, l) config.log.info("[NSYNC] Added callback to run after class '#{klass}'") @after_class_finished_queues[klass] ||= [] @after_class_finished_queues[klass] << l end |
#after_current_class_finished(l) ⇒ Object
Adds a callback to the list of callbacks to occur after main processing of the class that is currently being processed. This is essentially an alias for after_class_finished for the current class
218 219 220 |
# File 'lib/nsync/consumer.rb', line 218 def after_current_class_finished(l) after_class_finished(@current_class_for_queue, l) end |
#after_finished(l) ⇒ Object
Adds a callback to the list of callbacks to occur after all changes have been applied. This queue executes immediately prior to the current version being updated
228 229 230 231 232 |
# File 'lib/nsync/consumer.rb', line 228 def after_finished(l) config.log.info("[NSYNC] Added callback to run at the end of the update") @after_finished_queue ||= [] @after_finished_queue << l end |
#apply_changes(a, b) ⇒ Object
Translates and applies the changes between commit id ‘a’ and commit id ‘b’ to the datastore. This is used internally by rollback and update. Don’t use this unless you absolutely know what you are doing.
If you must call this directly, understand that ‘a’ should almost always be the commit id of the current data that is loaded into the database. ‘b’ can be any commit in the graph, forward or backwards.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/nsync/consumer.rb', line 104 def apply_changes(a, b) return false if a == b config.lock do config.log.info("[NSYNC] Moving Nsync::Consumer from '#{a}' to '#{b}'") clear_queues diffs = nil diffs = repo.diff(a, b) changeset = changeset_from_diffs(diffs) if config.ordering config.ordering.each do |klass| klass = begin CoreExtensions.constantize(klass) rescue NameError => e config.log.warn("[NSYNC] Could not find class '#{klass}' from ordering; skipping") false end if klass changes = changeset[klass] if changes apply_changes_for_class(klass, changes) end end end else changeset.each do |klass, changes| apply_changes_for_class(klass, changes) end end run_after_finished clear_queues config.version_manager.version = b end end |
#changes(a, b) ⇒ Object
89 90 91 92 |
# File 'lib/nsync/consumer.rb', line 89 def changes(a, b) diffs = repo.diff(a,b) changeset_from_diffs(diffs) end |
#config ⇒ Nsync::Config
79 80 81 |
# File 'lib/nsync/consumer.rb', line 79 def config Nsync.config end |
#first_commit ⇒ Object
Gets the first commit id in the repo
242 243 244 |
# File 'lib/nsync/consumer.rb', line 242 def first_commit self.repo.git.rev_list({:reverse => true}, "master").split("\n").first end |
#latest_changes ⇒ Object
83 84 85 86 87 |
# File 'lib/nsync/consumer.rb', line 83 def latest_changes update_repo && changes(config.version_manager.version, repo.head.commit.id) end |
#remotes ⇒ Object
Lists the configured data remotes in the repo
235 236 237 238 239 |
# File 'lib/nsync/consumer.rb', line 235 def remotes repo.git.remote({:v => true}).split("\n").map do |line| line.split(/\s+/) end end |
#reprocess_class!(klass) ⇒ Object
Reprocesses all changes from the start of the repo to the current version for the class klass, queues will not be cleared, so you can use this to do powerful data reconstruction. You can also shoot your foot off. Be very careful
144 145 146 147 148 149 150 151 152 |
# File 'lib/nsync/consumer.rb', line 144 def reprocess_class!(klass) diffs = repo.diff(first_commit, config.version_manager.version) changeset = changeset_from_diffs(diffs) changes = changeset[klass] if changes apply_changes_for_class(klass, changes) end end |
#rollback ⇒ Object
Rolls back data to the previous loaded version
NOTE: If you rollback and then update, the ‘bad’ commit will then be reloaded. This is primarily meant as a way to get back to a known good state quickly, while the issues are fixed in the producer.
73 74 75 76 |
# File 'lib/nsync/consumer.rb', line 73 def rollback apply_changes(config.version_manager.version, config.version_manager.previous_version) end |
#update ⇒ Object
Updates the data to the latest version
If the repo has a remote origin, the latest changes will be fetched.
NOTE: It is critical that the version_manager returns correct results as this method goes from what it says is the latest commit that was loaded in to HEAD.
62 63 64 65 66 |
# File 'lib/nsync/consumer.rb', line 62 def update update_repo && apply_changes(config.version_manager.version, repo.head.commit.id) end |