Module: RightScale::GlobalObjectReplicatorSource

Includes:
InfrastructureHelpers, ModelsHelper
Defined in:
lib/right_infrastructure_agent/global_object_replicator_source.rb

Overview

This module is for use as a mixin in an HTTP controller that handles requests from global object replication sink to check whether replication is required for a global object owned by this server.

This module expects the following to be defined:

- logger - variable pointing to the standard logger in use
- global_object_replicator_source - method returning type of replication source, e.g., 'library'

Constant Summary

Constants included from ModelsHelper

ModelsHelper::DEFAULT_RETRY_MESSAGE, ModelsHelper::DISABLED_SHARD_RETRY_MESSAGE, ModelsHelper::RETRYABLE_ERRORS, ModelsHelper::WRONG_SHARD_ERROR, ModelsHelper::WRONG_SHARD_RETRY_MESSAGE

Instance Method Summary collapse

Methods included from ModelsHelper

#account, #audit_entry, #instance, #instance_from_agent_id, #instance_from_token_id, #instance_token, #is_retryable_error?, #permission, #query, #recipe, #recipe_from_name, #repositories, #retrieve, #retrieve_or_create_audit, #retrieve_or_default_user, #right_script, #right_script_from_name, #run_query, #setting, #user_credential, #user_credential_from_fingerprint

Methods included from InfrastructureHelpers

#constantize, #format_error, #render_nothing, #to_bool, #to_int_or_nil

Instance Method Details

#verify_replica_rangeNilClass

Compares checksum parameter with local state of a replicated table and sends backs a replicator/synchronize_replica_range with records that may need to be updated in the core DB

Parameters:

  • :sink (String)

    Type of replication sink making request, e.g., “core” or “library”

  • :class_name (String)

    Name of a class that acts_as_global_object

  • :schema_version (Integer)

    Requested schema version for any synchronization records

  • :checksum_type (String)

    Type of checksum: only ‘global_object_version_sum’

  • :max_id_at_start (Integer)

    State passed back to the core in the resulting synchronize_replica_range request

  • :begin_id (Integer, NilClass)

    First row ID in the checksum range, or nil for all rows

  • :end_id (Integer, NilClass)

    Last row ID in the checksum range, or nil for all rows

  • :send_records_on_checksum_mismatch (Boolean)

    Whether any records should be returned in the resulting synchronize_replica_range request

  • :checksum_value (Integer, NilClass)

    Checksum value calculated using the specific :checksum_type

  • :shard_id (Integer)

    Shard ID of the sender

Returns:

  • (NilClass)

    nil

Raises:

  • (ArgumentError)

    Unknown replication sink

  • (RightScale::Exceptions::QueryFailure)

    Failed to complete synchronization query

  • (RightScale::Exceptions::RetryableError)

    Query failed but may be retried



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/right_infrastructure_agent/global_object_replicator_source.rb', line 51

def verify_replica_range
  global_object_class = constantize(params[:class_name])
  schema_version = to_int_or_nil(params[:schema_version])
  checksum_type = params[:checksum_type]
  max_id_at_start = to_int_or_nil(params[:max_id_at_start])
  begin_id = to_int_or_nil(params[:begin_id])
  end_id = to_int_or_nil(params[:end_id])
  send_records_on_checksum_mismatch = params[:send_records_on_checksum_mismatch]
  checksum_value = params[:checksum_value]
  shard_id = to_int_or_nil(params[:shard_id])

  success = query("verify_replica_range for #{global_object_class.name} rows #{begin_id} " +
                  " - #{end_id}", :email_errors => true) do
    records_to_synchronize = []

    if (global_object_class.calculate_global_object_checksum(checksum_type, schema_version, begin_id, end_id) == checksum_value)
      checksum_matched = true
      logger.debug("GlobalObjectReplica: Verified #{global_object_class.name} global_object_version_sum " +
        "for rows #{begin_id} - #{end_id}")
    else
      checksum_matched = false
      logger.debug("GlobalObjectReplica: Verification of #{global_object_class.name} global_object_version_sum " +
        "rows #{begin_id} - #{end_id} failed. Sending synchronization records.")
      records_to_synchronize = if send_records_on_checksum_mismatch
        global_object_class.get_initialization_hashes(schema_version, begin_id, end_id)
      else
        []
      end
    end

    replicator = case params[:sink] # TODO Replace this with "#{params[:sink]}_replicator" once all replicator actors gone
                 when "core" then "core_replicator"
                 else raise ArgumentError, "Unknown replication sink: #{params[:sink].inspect}"
                 end
    payload = {
      :source                 => global_object_replicator_source,
      :class_name             => global_object_class.name,
      :checksum_type          => checksum_type,
      :max_id_at_start        => max_id_at_start,
      :begin_id               => begin_id,
      :end_id                 => end_id,
      :checksum_matched       => checksum_matched,
      :records_to_synchronize => records_to_synchronize,
      :has_more               => global_object_class.max_id > end_id }

    EM.next_tick do
      # Execute this request on next_tick since, depending on the configuration, the router could route
      # this request directly via HTTP and there would be no break in the chain of replication requests
      begin
        RightScale::RightHttpClient.push("/#{replicator}/synchronize_replica_range", payload, :scope => {:shard => shard_id})
      rescue Exception => e
        logger.error(format_error("Failed to synchronize_replica_range for class #{global_object_class.name}", e))
      end
    end

    true
  end

  raise RightScale::Exceptions::QueryFailure.new(@last_error) unless success

  render_nothing
end