Class: RR::Replicators::TwoWayReplicator
- Inherits:
-
Object
- Object
- RR::Replicators::TwoWayReplicator
- Defined in:
- lib/rubyrep/replicators/two_way_replicator.rb
Overview
This replicator implements a two way replication. Options:
-
:
left_change_handling
, :right_change_handling
: Handling of records that were changed only in the named database. Can be any of the following:-
:
ignore
: No action. -
:
replicate
: Updates other database accordingly. Default Setting -
Proc
object: If a Proc object is given, it is responsible for dealing with the record. Called with the following parameters:-
replication_helper: The current ReplicationHelper instance.
-
difference: A ReplicationDifference instance describing the change
-
-
-
:
replication_conflict_handling
: Handling of conflicting record changes. Can be any of the following:-
:
ignore
: No action. Default Setting -
:
left_wins
: The right database is updated accordingly. -
:
right_wins
: The left database is updated accordingly. -
:
later_wins
: The more recent change is replicated. (If both changes have same age: left change is replicated) -
:
earlier_wins
: The less recent change is replicated. (If both records have same age: left change is replicated) -
Proc
object: If a Proc object is given, it is responsible for dealing with the record. Called with the following parameters:-
replication_helper: The current ReplicationHelper instance.
-
difference: A ReplicationDifference instance describing the changes
-
-
-
:
logged_replication_events
: Specifies which types of replications are logged. Is either a single value or an array of multiple ones. Default: [:ignored_conflicts] Possible values:-
:
ignored_changes
: log ignored (but not replicated) non-conflict changes -
:
all_changes
: log all non-conflict changes -
:
ignored_conflicts
: log ignored (but not replicated) conflicts -
:
all_conflicts
: log all conflicts
-
Example of using a Proc object for custom behaviour:
lambda do |rep_helper, diff|
# if specified as replication_conflict_handling option, logs all
# conflicts to a text file
File.open('/var/log/rubyrep_conflict_log', 'a') do |f|
f.puts <<-end_str
#{Time.now}: conflict
* in table #{diff.changes[:left].table}
* for record '#{diff.changes[:left].key}'
* change type in left db: '#{diff.changes[:left].type}'
* change type in right db: '#{diff.changes[:right].type}'
end_str
end
end
Constant Summary collapse
- OTHER_SIDE =
Shortcut to calculate the “other” database.
{ :left => :right, :right => :left }
- CONFLICT_STATE_MATRIX =
Specifies how to clear conflicts. The outer hash keys describe the type of the winning change. The inner hash keys describe the type of the loosing change. The inser hash values describe the action to take on the loosing side.
{ :insert => {:insert => :update, :update => :update, :delete => :insert}, :update => {:insert => :update, :update => :update, :delete => :insert}, :delete => {:insert => :delete, :update => :delete, :delete => :delete} }
- MAX_REPLICATION_ATTEMPTS =
How often a replication will be attempted (in case it fails because the record in question was removed from the source or inserted into the target database after the ReplicationDifference was loaded
2
Instance Attribute Summary collapse
-
#rep_helper ⇒ Object
The current ReplicationHelper object.
Class Method Summary collapse
-
.default_options ⇒ Object
Provides default option for the replicator.
Instance Method Summary collapse
-
#attempt_change(action, source_db, target_db, diff, remaining_attempts) ⇒ Object
Helper for execution of insert / update / delete attempts.
-
#attempt_delete(source_db, diff, remaining_attempts, target_key) ⇒ Object
Attempts to delete the source record from the target database.
-
#attempt_insert(source_db, diff, remaining_attempts, source_key) ⇒ Object
Attempts to read the specified record from the source database and insert it into the target database.
-
#attempt_update(source_db, diff, remaining_attempts, source_key, target_key) ⇒ Object
Attempts to read the specified record from the source database and update the specified record in the target database.
-
#clear_conflict(source_db, diff, remaining_attempts) ⇒ Object
Helper function that clears a conflict by taking the change from the specified winning database and updating the other database accordingly.
-
#initialize(rep_helper) ⇒ TwoWayReplicator
constructor
Initializes the TwoWayReplicator Raises an ArgumentError if any of the replication options is invalid.
-
#log_replication_outcome(winner, diff) ⇒ Object
Logs replication of the specified difference as per configured :
replication_conflict_logging
/ :left_change_logging
/ :right_change_logging
options. -
#replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) ⇒ Object
Called to replicate the specified difference.
-
#validate_change_handling_options ⇒ Object
Verifies if the :
left_change_handling
/ :right_change_handling
options are valid. -
#validate_conflict_handling_options ⇒ Object
Verifies if the given :
replication_conflict_handling
options are valid. -
#validate_logging_options ⇒ Object
Verifies if the given :
replication_logging
option /options is / are valid. -
#verify_option(table_spec, valid_option_values, option_key, option_value) ⇒ Object
Checks if an option is configured correctly.
Constructor Details
#initialize(rep_helper) ⇒ TwoWayReplicator
Initializes the TwoWayReplicator Raises an ArgumentError if any of the replication options is invalid.
Parameters:
-
rep_helper: The ReplicationHelper object providing information and utility functions.
132 133 134 135 136 137 138 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 132 def initialize(rep_helper) self.rep_helper = rep_helper end |
Instance Attribute Details
#rep_helper ⇒ Object
The current ReplicationHelper object
61 62 63 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 61 def rep_helper @rep_helper end |
Class Method Details
.default_options ⇒ Object
Provides default option for the replicator. Optional. Returns a hash with key => value pairs.
65 66 67 68 69 70 71 72 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 65 def self. { :left_change_handling => :replicate, :right_change_handling => :replicate, :replication_conflict_handling => :ignore, :logged_replication_events => [:ignored_conflicts], } end |
Instance Method Details
#attempt_change(action, source_db, target_db, diff, remaining_attempts) ⇒ Object
Helper for execution of insert / update / delete attempts. Wraps those attempts into savepoints and handles exceptions.
Note: Savepoints have to be used for PostgreSQL (as a failed SQL statement will otherwise invalidate the complete transaction.)
-
action
: short description of change (e. g.: “update” or “delete”) -
source_db
: either :left
or :right
- source database of replication -
target_db
: either :left
or :right
- target database of replication -
diff
: the current ReplicationDifference instance -
remaining_attempts
: the number of remaining replication attempts for this difference
272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 272 def attempt_change(action, source_db, target_db, diff, remaining_attempts) begin rep_helper.session.send(target_db).execute "savepoint rr_#{action}_#{remaining_attempts}" yield unless rep_helper.new_transaction? rep_helper.session.send(target_db).execute "release savepoint rr_#{action}_#{remaining_attempts}" end rescue Exception => e rep_helper.session.send(target_db).execute "rollback to savepoint rr_#{action}_#{remaining_attempts}" diff.amend replicate_difference diff, remaining_attempts - 1, "#{action} failed with #{e.}" end end |
#attempt_delete(source_db, diff, remaining_attempts, target_key) ⇒ Object
Attempts to delete the source record from the target database.
-
if +source_db is :
left
, then the record is deleted in database
-
:right
.
-
source_db
: either :left
or :right
- source database of replication -
diff
: the current ReplicationDifference instance -
remaining_attempts
: the number of remaining replication attempts for this difference -
target_key
: a column_name => value hash identifying the source record
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 294 def attempt_delete(source_db, diff, remaining_attempts, target_key) change = diff.changes[source_db] target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, change.table) attempt_change('delete', source_db, target_db, diff, remaining_attempts) do number_updated = rep_helper.delete_record target_db, target_table, target_key if number_updated == 0 diff.amend replicate_difference diff, remaining_attempts - 1, "target record for delete vanished" else log_replication_outcome source_db, diff end end end |
#attempt_insert(source_db, diff, remaining_attempts, source_key) ⇒ Object
Attempts to read the specified record from the source database and insert it into the target database. Retries if insert fails due to missing source or suddenly existing target record.
-
source_db
: either :left
or :right
- source database of replication -
diff
: the current ReplicationDifference instance -
remaining_attempts
: the number of remaining replication attempts for this difference -
source_key
: a column_name => value hash identifying the source record
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 211 def attempt_insert(source_db, diff, remaining_attempts, source_key) source_change = diff.changes[source_db] source_table = source_change.table target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, source_table) values = rep_helper.load_record source_db, source_table, source_key if values == nil diff.amend replicate_difference diff, remaining_attempts - 1, "source record for insert vanished" else attempt_change('insert', source_db, target_db, diff, remaining_attempts) do rep_helper.insert_record target_db, target_table, values log_replication_outcome source_db, diff end end end |
#attempt_update(source_db, diff, remaining_attempts, source_key, target_key) ⇒ Object
Attempts to read the specified record from the source database and update the specified record in the target database. Retries if update fails due to missing source
-
source_db
: either :left
or :right
- source database of replication -
diff
: the current ReplicationDifference instance -
remaining_attempts
: the number of remaining replication attempts for this difference -
source_key
: a column_name => value hash identifying the source record -
target_key
: a column_name => value hash identifying the source record
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 237 def attempt_update(source_db, diff, remaining_attempts, source_key, target_key) source_change = diff.changes[source_db] source_table = source_change.table target_db = OTHER_SIDE[source_db] target_table = rep_helper.corresponding_table(source_db, source_table) values = rep_helper.load_record source_db, source_table, source_key if values == nil diff.amend replicate_difference diff, remaining_attempts - 1, "source record for update vanished" else attempt_change('update', source_db, target_db, diff, remaining_attempts) do number_updated = rep_helper.update_record target_db, target_table, values, target_key if number_updated == 0 diff.amend replicate_difference diff, remaining_attempts - 1, "target record for update vanished" else log_replication_outcome source_db, diff end end end end |
#clear_conflict(source_db, diff, remaining_attempts) ⇒ Object
Helper function that clears a conflict by taking the change from the specified winning database and updating the other database accordingly.
-
source_db
: the winning database (either :left
or :right
) -
diff
: the ReplicationDifference instance -
remaining_attempts
: the number of remaining replication attempts for this difference
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 161 def clear_conflict(source_db, diff, remaining_attempts) source_change = diff.changes[source_db] target_db = OTHER_SIDE[source_db] target_change = diff.changes[target_db] target_action = CONFLICT_STATE_MATRIX[source_change.type][target_change.type] source_key = source_change.type == :update ? source_change.new_key : source_change.key target_key = target_change.type == :update ? target_change.new_key : target_change.key case target_action when :insert attempt_insert source_db, diff, remaining_attempts, source_key when :update attempt_update source_db, diff, remaining_attempts, source_key, target_key when :delete attempt_delete source_db, diff, remaining_attempts, target_key end end |
#log_replication_outcome(winner, diff) ⇒ Object
Logs replication of the specified difference as per configured :replication_conflict_logging
/ :left_change_logging
/ :right_change_logging
options.
-
winner
: Either the winner database (:left
or :right
) or :ignore
-
diff
: the ReplicationDifference instance
183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 183 def log_replication_outcome(winner, diff) = rep_helper.(diff.changes[:left].table) option_values = [[:logged_replication_events]].flatten # make sure I have an array if diff.type == :conflict return unless option_values.include?(:all_conflicts) or option_values.include?(:ignored_conflicts) return if winner != :ignore and not option_values.include?(:all_conflicts) outcome = {:left => 'left_won', :right => 'right_won', :ignore => 'ignored'}[winner] else return unless option_values.include?(:all_changes) or option_values.include?(:ignored_changes) return if winner != :ignore and not option_values.include?(:all_changes) outcome = winner == :ignore ? 'ignored' : 'replicated' end rep_helper.log_replication_outcome diff, outcome end |
#replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) ⇒ Object
Called to replicate the specified difference.
-
:
diff
: ReplicationDifference instance -
:
remaining_attempts
: how many more times a replication will be attempted -
:
previous_failure_description
: why the previous replication attempt failed
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 314 def replicate_difference(diff, remaining_attempts = MAX_REPLICATION_ATTEMPTS, previous_failure_description = nil) raise Exception, previous_failure_description || "max replication attempts exceeded" if remaining_attempts == 0 = rep_helper.(diff.changes[:left].table) if diff.type == :left or diff.type == :right key = diff.type == :left ? :left_change_handling : :right_change_handling option = [key] if option == :ignore log_replication_outcome :ignore, diff elsif option == :replicate source_db = diff.type change = diff.changes[source_db] case change.type when :insert attempt_insert source_db, diff, remaining_attempts, change.key when :update attempt_update source_db, diff, remaining_attempts, change.new_key, change.key when :delete attempt_delete source_db, diff, remaining_attempts, change.key end else # option must be a Proc option.call rep_helper, diff end elsif diff.type == :conflict option = [:replication_conflict_handling] if option == :ignore log_replication_outcome :ignore, diff elsif option == :left_wins clear_conflict :left, diff, remaining_attempts elsif option == :right_wins clear_conflict :right, diff, remaining_attempts elsif option == :later_wins winner_db = diff.changes[:left].last_changed_at >= diff.changes[:right].last_changed_at ? :left : :right clear_conflict winner_db, diff, remaining_attempts elsif option == :earlier_wins winner_db = diff.changes[:left].last_changed_at <= diff.changes[:right].last_changed_at ? :left : :right clear_conflict winner_db, diff, remaining_attempts else # option must be a Proc option.call rep_helper, diff end end end |
#validate_change_handling_options ⇒ Object
Verifies if the :left_change_handling
/ :right_change_handling
options are valid. Raises an ArgumentError if an option is invalid
91 92 93 94 95 96 97 98 99 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 91 def [:left_change_handling, :right_change_handling].each do |key| rep_helper.session.configuration.each_matching_option(key) do |table_spec, value| unless value.respond_to? :call verify_option table_spec, [:ignore, :replicate], key, value end end end end |
#validate_conflict_handling_options ⇒ Object
Verifies if the given :replication_conflict_handling
options are valid. Raises an ArgumentError if an option is invalid.
103 104 105 106 107 108 109 110 111 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 103 def rep_helper.session.configuration.each_matching_option(:replication_conflict_handling) do |table_spec, value| unless value.respond_to? :call verify_option table_spec, [:ignore, :left_wins, :right_wins, :later_wins, :earlier_wins], :replication_conflict_handling, value end end end |
#validate_logging_options ⇒ Object
Verifies if the given :replication_logging
option /options is / are valid. Raises an ArgumentError if invalid
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 115 def rep_helper.session.configuration.each_matching_option(:logged_replication_events) do |table_spec, values| values = [values].flatten # ensure that I have an array values.each do |value| verify_option table_spec, [:ignored_changes, :all_changes, :ignored_conflicts, :all_conflicts], :logged_replication_events, value end end end |
#verify_option(table_spec, valid_option_values, option_key, option_value) ⇒ Object
Checks if an option is configured correctly. Raises an ArgumentError if not.
-
table_spec
: the table specification to which the option belongs. May benil
. -
valid_option_values
: array of valid option values -
option_key
: the key of the option that is to be checked -
option_value
: the value of the option that is to be checked
79 80 81 82 83 84 85 86 |
# File 'lib/rubyrep/replicators/two_way_replicator.rb', line 79 def verify_option(table_spec, valid_option_values, option_key, option_value) unless valid_option_values.include? option_value = "" << "#{table_spec.inspect}: " if table_spec << "#{option_value.inspect} not a valid #{option_key.inspect} option" raise ArgumentError.new() end end |