Class: Longleaf::S3ReplicationService

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/longleaf/preservation_services/s3_replication_service.rb

Overview

Preservation service which performs replication of a file to one or more s3 destinations.

The service definition must contain one or more destinations, specified with the “to” property. These destinations must be either a known s3 storage location. The s3 client configuration is controlled by the storage location.

Optional service configuration properties:

  • replica_collision_policy = specifies the desired outcome if the service attempts to replicate

    a file which already exists at a destination. Default: "replace".
    

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#initialize_logger, initialize_logger, logger, #logger

Constructor Details

#initialize(service_def, app_manager) ⇒ S3ReplicationService

Initialize a S3ReplicationService from the given service definition

Parameters:



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/longleaf/preservation_services/s3_replication_service.rb', line 31

def initialize(service_def, app_manager)
  @service_def = service_def
  @app_manager = app_manager

  # Set and validate the replica collision policy
  @collision_policy = @service_def.properties[SF::COLLISION_PROPERTY] || SF::DEFAULT_COLLISION_POLICY
  if !SF::VALID_COLLISION_POLICIES.include?(@collision_policy)
    raise ArgumentError.new("Service #{service_def.name} received invalid #{SF::COLLISION_PROPERTY}" \
        + " value #{@collision_policy}")
  end

  # Store and validate destinations
  replicate_to = @service_def.properties[SF::REPLICATE_TO]
  if replicate_to.nil? || replicate_to.empty?
    raise ArgumentError.new("Service #{service_def.name} must provide one or more replication destinations.")
  end
  replicate_to = [replicate_to] if replicate_to.is_a?(String)

  loc_manager = app_manager.location_manager
  # Build list of destinations, translating to storage locations when relevant
  @destinations = Array.new
  replicate_to.each do |dest|
    if loc_manager.locations.key?(dest)
      location = loc_manager.locations[dest]
      if location.type != ST::S3_STORAGE_TYPE
        raise ArgumentError.new(
            "Service #{service_def.name} specifies destination #{dest} which is not of type 's3'")
      end
      @destinations << loc_manager.locations[dest]
    else
      raise ArgumentError.new("Service #{service_def.name} specifies unknown storage location '#{dest}'" \
          + " as a replication destination")
    end
  end
end

Instance Attribute Details

#collision_policyObject (readonly)

Returns the value of attribute collision_policy.



25
26
27
# File 'lib/longleaf/preservation_services/s3_replication_service.rb', line 25

def collision_policy
  @collision_policy
end

Instance Method Details

#is_applicable?(event) ⇒ Boolean

Determine if this service is applicable for the provided event, given the configured service definition

Parameters:

  • event (String)

    name of the event

Returns:

  • (Boolean)

    returns true if this service is applicable for the provided event



112
113
114
115
116
117
118
119
# File 'lib/longleaf/preservation_services/s3_replication_service.rb', line 112

def is_applicable?(event)
  case event
  when EventNames::PRESERVE
    true
  else
    false
  end
end

#perform(file_rec, event) ⇒ Object

During a replication event, perform replication of the specified file to all configured destinations as necessary.

Parameters:

  • file_rec (FileRecord)

    record representing the file to perform the service on.

  • event (String)

    name of the event this service is being invoked by.

Raises:



73
74
75
76
77
78
79
80
# File 'lib/longleaf/preservation_services/s3_replication_service.rb', line 73

def perform(file_rec, event)
  if file_rec.storage_location.type == ST::FILESYSTEM_STORAGE_TYPE
    replicate_from_fs(file_rec)
  else
    raise PreservationServiceError.new("Replication from storage location of type " \
        + "#{file_rec.storage_location.type} to s3 is not supported")
  end
end

#replicate_from_fs(file_rec) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/longleaf/preservation_services/s3_replication_service.rb', line 82

def replicate_from_fs(file_rec)
  # Determine the path to the file being replicated relative to its storage location
  rel_path = file_rec.storage_location.relativize(file_rec.path)

  @destinations.each do |destination|
    # Check that the destination is available before attempting to write
    verify_destination_available(destination, file_rec)

    rel_to_bucket = destination.relative_to_bucket_path(rel_path)
    file_obj = destination.s3_bucket.object(rel_to_bucket)
    begin
      file_obj.upload_file(file_rec.physical_path)
    rescue Aws::S3::Errors::BadDigest => e
      raise ChecksumMismatchError.new("Transfer to bucket '#{destination.s3_bucket.name}' failed, " \
          + "MD5 provided did not match the received content for #{file_rec.path}")
    rescue Aws::Errors::ServiceError => e
      raise PreservationServiceError.new("Failed to transfer #{file_rec.path} to bucket " \
          + "'#{destination.s3_bucket.name}': #{e.message}")
    end

    logger.info("Replicated #{file_rec.path} to destination #{file_obj.public_url}")

    # TODO register file in destination
  end
end