Module: RightScale::GlobalObjectReplicatorSource
- Includes:
- InfrastructureHelpers, ModelsHelper
- Defined in:
- lib/right_infrastructure_agent/global_object_replicator_source.rb
Overview
This module is for use as a mixin in an HTTP controller that handles requests from global object replication sink to check whether replication is required for a global object owned by this server.
This module expects the following to be defined:
- logger - variable pointing to the standard logger in use
- global_object_replicator_source - method returning type of replication source, e.g., 'library'
Constant Summary
Constants included from ModelsHelper
ModelsHelper::DEFAULT_RETRY_MESSAGE, ModelsHelper::DISABLED_SHARD_RETRY_MESSAGE, ModelsHelper::RETRYABLE_ERRORS, ModelsHelper::WRONG_SHARD_ERROR, ModelsHelper::WRONG_SHARD_RETRY_MESSAGE
Instance Method Summary collapse
-
#verify_replica_range ⇒ NilClass
Compares checksum parameter with local state of a replicated table and sends backs a replicator/synchronize_replica_range with records that may need to be updated in the core DB.
Methods included from ModelsHelper
#account, #audit_entry, #instance, #instance_from_agent_id, #instance_from_token_id, #instance_token, #is_retryable_error?, #permission, #query, #recipe, #recipe_from_name, #repositories, #retrieve, #retrieve_or_create_audit, #retrieve_or_default_user, #right_script, #right_script_from_name, #run_query, #setting, #user_credential, #user_credential_from_fingerprint
Methods included from InfrastructureHelpers
#constantize, #format_error, #render_nothing, #to_bool, #to_int_or_nil
Instance Method Details
#verify_replica_range ⇒ NilClass
Compares checksum parameter with local state of a replicated table and sends backs a replicator/synchronize_replica_range with records that may need to be updated in the core DB
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/right_infrastructure_agent/global_object_replicator_source.rb', line 51 def verify_replica_range global_object_class = constantize(params[:class_name]) schema_version = to_int_or_nil(params[:schema_version]) checksum_type = params[:checksum_type] max_id_at_start = to_int_or_nil(params[:max_id_at_start]) begin_id = to_int_or_nil(params[:begin_id]) end_id = to_int_or_nil(params[:end_id]) send_records_on_checksum_mismatch = params[:send_records_on_checksum_mismatch] checksum_value = params[:checksum_value] shard_id = to_int_or_nil(params[:shard_id]) success = query("verify_replica_range for #{global_object_class.name} rows #{begin_id} " + " - #{end_id}", :email_errors => true) do records_to_synchronize = [] if (global_object_class.calculate_global_object_checksum(checksum_type, schema_version, begin_id, end_id) == checksum_value) checksum_matched = true logger.debug("GlobalObjectReplica: Verified #{global_object_class.name} global_object_version_sum " + "for rows #{begin_id} - #{end_id}") else checksum_matched = false logger.debug("GlobalObjectReplica: Verification of #{global_object_class.name} global_object_version_sum " + "rows #{begin_id} - #{end_id} failed. Sending synchronization records.") records_to_synchronize = if send_records_on_checksum_mismatch global_object_class.get_initialization_hashes(schema_version, begin_id, end_id) else [] end end replicator = case params[:sink] # TODO Replace this with "#{params[:sink]}_replicator" once all replicator actors gone when "core" then "core_replicator" else raise ArgumentError, "Unknown replication sink: #{params[:sink].inspect}" end payload = { :source => global_object_replicator_source, :class_name => global_object_class.name, :checksum_type => checksum_type, :max_id_at_start => max_id_at_start, :begin_id => begin_id, :end_id => end_id, :checksum_matched => checksum_matched, :records_to_synchronize => records_to_synchronize, :has_more => global_object_class.max_id > end_id } EM.next_tick do # Execute this request on next_tick since, depending on the configuration, the router could route # this request directly via HTTP and there would be no break in the chain of replication requests begin RightScale::RightHttpClient.push("/#{replicator}/synchronize_replica_range", payload, :scope => {:shard => shard_id}) rescue Exception => e logger.error(format_error("Failed to synchronize_replica_range for class #{global_object_class.name}", e)) end end true end raise RightScale::Exceptions::QueryFailure.new(@last_error) unless success render_nothing end |