Module: RightScale::GlobalObjectReplicatorSink

Includes:
InfrastructureHelpers, ModelsHelper
Defined in:
lib/right_infrastructure_agent/global_object_replicator_sink.rb

Overview

This module is for use as a mixin in an HTTP controller that handles requests from a global object replication source (e.g., library) to synchronize its replicas.

This module expects the following to be defined:

- logger - variable pointing to the standard logger in use
- global_object_replicator_sink - method returning type of replication sink, e.g., 'core'

Constant Summary

Constants included from ModelsHelper

ModelsHelper::DEFAULT_RETRY_MESSAGE, ModelsHelper::DISABLED_SHARD_RETRY_MESSAGE, ModelsHelper::RETRYABLE_ERRORS, ModelsHelper::WRONG_SHARD_ERROR, ModelsHelper::WRONG_SHARD_RETRY_MESSAGE

Instance Method Summary collapse

Methods included from ModelsHelper

#account, #audit_entry, #instance, #instance_from_agent_id, #instance_from_token_id, #instance_token, #is_retryable_error?, #permission, #query, #recipe, #recipe_from_name, #repositories, #retrieve, #retrieve_or_create_audit, #retrieve_or_default_user, #right_script, #right_script_from_name, #run_query, #setting, #user_credential, #user_credential_from_fingerprint

Methods included from InfrastructureHelpers

#constantize, #format_error, #render_nothing, #to_bool, #to_int_or_nil

Instance Method Details

#handle_global_object_changeNilClass

Update the specified right_site core object (class_name, id) so that its attributes match those of the global object

Parameters:

  • :source (String)

    Replication source, e.g., ‘library’

  • :class_name (String)

    Name of the object class being updated

  • :id (Integer)

    Unique identifier of the object being updated

  • :schema_version (Integer)

    Schema version of the model (class)

  • :global_object_version (String)

    Global object version of the model (class)

  • :attrs (String)

    All attributes defined for the object represented by :class_name and :id in yaml

Returns:

  • (NilClass)

    nil

Raises:

  • (RightScale::Exceptions::QueryFailure)

    Failed to update object

  • (RightScale::Exceptions::RetryableError)

    Query failed but may be retried



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/right_infrastructure_agent/global_object_replicator_sink.rb', line 42

def handle_global_object_change
  source = params[:source]
  if (object_class = replicate_sink_class(params[:class_name]))
    logger.info("GlobalObjectReplica: Processing #{source} global object change for #{object_class.name}/#{params[:id]}")

    id = to_int_or_nil(params[:id])
    schema_version = to_int_or_nil(params[:schema_version])
    action  = "replicator sink handle_global_object_change for #{object_class.name}/#{id} " +
              "(schema version #{schema_version}, global object version #{params[:global_object_version]})"
    success = query(action, :include_backtrace_in_last_error => true) do
      object_class.handle_global_object_change(id, schema_version, params[:global_object_version], params[:attrs])
      true
    end

    raise RightScale::Exceptions::QueryFailure.new(@last_error) unless success
  else
    logger.warn("GlobalObjectReplica: Ignoring replicator sink handle_global_object_change from #{source} " +
                "for unknown will_replicate sink class #{params[:class_name].inspect}")
  end
  render_nothing
end

#synchronize_replica_rangeNilClass

Update rows using any received synchronization data and send a verify_replica_range request for the next range to process

Parameters:

  • :source (String)

    Replication source, e.g., ‘library’

  • :class_name (String)

    Name of a class that acts_as_global_object_replica

  • :checksum_type (String)

    Type of checksum: only ‘global_object_version_sum’

  • :max_id_at_start (Integer)

    Max ID of the replica_class when first started synchronization process

  • :begin_id (Integer, NilClass)

    First row ID in range verified by the source, or nil for all rows

  • :end_id (Integer, NilClass)

    Last row ID in range verified by the source, or nil for all rows

  • :checksum_matched (Boolean)

    Whether checksum matched

  • :records_to_synchronize (Array, NilClass)

    Blank array or nil (if the last verification check was positive) or an array of hashes to be used to update the replicated table with each hash containing the keys used in handle_global_object_change: :id, :schema_version, :global_object_version, :attrs

  • :has_more (Boolean)

    Indicates at least one more range should be verified

Returns:

  • (NilClass)

    nil

Raises:

  • (ArgumentError)

    Unknown replication source

  • (RightScale::Exceptions::QueryFailure)

    Failed to complete synchronization query

  • (RightScale::Exceptions::RetryableError)

    Query failed but may be retried



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/right_infrastructure_agent/global_object_replicator_sink.rb', line 134

def synchronize_replica_range
  source = params[:source]
  if (replica_class = replicate_sink_class(params[:class_name]))
    checksum_type = params[:checksum_type]
    max_id_at_start = to_int_or_nil(params[:max_id_at_start])
    begin_id = to_int_or_nil(params[:begin_id])
    end_id = to_int_or_nil(params[:end_id])
    checksum_matched = to_bool(params[:checksum_matched])
    records = params[:records_to_synchronize] || []
    has_more = to_bool(params[:has_more])

    success = query("synchronize_replica_range for #{replica_class.name}", :email_errors => true) do
      logger.info("GlobalObjectReplica: Synchronizing #{source} #{replica_class.name} range #{begin_id}-#{end_id} " +
                  "(#{checksum_type} #{checksum_matched ? 'match' : 'mismatch'}) #{records.size} rows received.")
      # Shuffle the records here so that collisions are less likely if two processes are trying to sync at once
      records.shuffle.each do |h|
        h = RightScale::SerializationHelper.symbolize_keys(h)
        replica_class.handle_global_object_change(h[:id], h[:schema_version], h[:global_object_version], h[:attrs])
      end

      in_sync = checksum_matched || !(records.nil? || records.empty?)

      if has_more || !in_sync
        set_global_object_replication_status(replica_class.name, checksum_type, :percent_complete =>
                                             (100 * ([begin_id, 1].max / [max_id_at_start, 1].max.to_f)).floor) if in_sync

        next_begin_id, next_end_id = calculate_next_range_for_binary_sync(source, max_id_at_start,
                                                                          replica_class.will_replicate_initialization_chunk_size,
                                                                          begin_id, end_id, in_sync)
        verify_next_replica_range(source, replica_class, checksum_type, max_id_at_start, next_begin_id, next_end_id)
      else
        logger.info("GlobalObjectReplica: Synchronization of #{source} #{replica_class.name} complete at row #{end_id}")
        set_global_object_replication_status(replica_class.name, checksum_type, :completed => true)
      end
      true
    end

    unless success
      set_global_object_replication_status(replica_class.name, checksum_type, :failed => true)
      raise RightScale::Exceptions::QueryFailure.new(@last_error)
    end
  else
    logger.warn("GlobalObjectReplica: Ignoring replicator sink synchronize_replica_range from #{source} " +
                "for unknown will_replicate sink class #{params[:class_name].inspect}")
  end
  render_nothing
end

#verify_replicasNilClass

Compare checksum parameter of a replicated table and, if they do not match, begin the synchronization process by sending verify_replica_range to replicator source

Parameters:

  • :source (String)

    Replication source, e.g., ‘library’

  • :class_name (String)

    Name of a class that acts_as_global_object_replica

  • :checksum_type (String)

    Type of checksum: only ‘global_object_version_sum’

  • :checksum_value (Integer, NilClass)

    Sum of global_object_versions for the table in the source database

Returns:

  • (NilClass)

    nil

Raises:

  • (ArgumentError)

    Unknown replication source

  • (RightScale::Exceptions::QueryFailure)

    Failed to complete verify replicas query

  • (RightScale::Exceptions::RetryableError)

    Query failed but may be retried



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/right_infrastructure_agent/global_object_replicator_sink.rb', line 78

def verify_replicas
  source = params[:source]
  if (replica_class = replicate_sink_class(params[:class_name]))
    checksum_type = params[:checksum_type]
    checksum_value = params[:checksum_value]

    success = query("verify_replicas for #{replica_class.name}", :email_errors => true) do
      if replica_class.calculate_global_object_checksum(checksum_type) == checksum_value
        logger.info("GlobalObjectReplica: Verified #{source} #{replica_class.name} global_object_version_sum.")
        set_global_object_replication_status(replica_class.name, checksum_type, :completed => true)
      else
        logger.info("GlobalObjectReplica: Verification of #{source} #{replica_class.name} global_object_version_sum failed. " +
                    "Beginning synchronization.")
        set_global_object_replication_status(replica_class.name, checksum_type, :start => true)

        max_id = replica_class.max_id
        verify_next_replica_range(source, replica_class, checksum_type, max_id, 0, max_id)
      end
      true
    end

    unless success
      set_global_object_replication_status(replica_class.name, checksum_type, :failed => true)
      raise RightScale::Exceptions::QueryFailure.new(@last_error)
    end
  else
    logger.warn("GlobalObjectReplica: Ignoring replicator sink verify_replicas from #{source} " +
                "for unknown will_replicate sink class #{params[:class_name].inspect}")
  end
  render_nothing
end