Class: NcsNavigator::Warehouse::TransformLoad

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
StringifyTrace
Defined in:
lib/ncs_navigator/warehouse/transform_load.rb

Overview

Executes the ETL process for a given configuration.

The configuration is "used up" after one execution; to run ETL multiple times in the same process, the configuration will need to be reloaded.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from StringifyTrace

stringify_trace

Constructor Details

#initialize(configuration) ⇒ TransformLoad

Returns a new instance of TransformLoad.



21
22
23
# File 'lib/ncs_navigator/warehouse/transform_load.rb', line 21

def initialize(configuration)
  @configuration = configuration
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



16
17
18
# File 'lib/ncs_navigator/warehouse/transform_load.rb', line 16

def configuration
  @configuration
end

#statusesObject (readonly)

Returns the value of attribute statuses.



17
18
19
# File 'lib/ncs_navigator/warehouse/transform_load.rb', line 17

def statuses
  @statuses
end

Instance Method Details

#runObject



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/ncs_navigator/warehouse/transform_load.rb', line 25

def run
  position = 0
  @statuses = configuration.transformers.collect do |transformer|
    ::DataMapper.repository(:mdes_warehouse_working) do |repo|
      # redefine identity map as a no-op so it doesn't cache
      # anything. TODO: provide a patch to DataMapper that makes
      # something like this an option.
      def repo.identity_map(model); {}; end

      build_status_for(transformer, position).tap do |status|
        begin
          TransformStatus.transaction do
            if repo.adapter.to_s =~ /Postgres/
              repo.adapter.execute("SET LOCAL synchronous_commit TO OFF")
            end
            begin
              transformer.transform(status)
            rescue => e
              shell.say_line("\nTransform failed. (See log for more detail.)")
              msg = "Transform failed. #{e.class}: #{e}\n#{stringify_trace(e.backtrace)}"
              log.error(msg)
              status.add_error(msg)
            end
          end
        rescue DataObjects::IntegrityError => e
          shell.say_line(
            "\nTransform failed with data integrity error. (See log for more detail.)")
          log.error(
            "Transform failed with data integrity error: #{e}.\n#{stringify_trace(e.backtrace)}")
          status.add_error("Transform failed with data integrity error: #{e}.")
        end
        status.end_time = Time.now
        unless status.save
          shell.say_line("Could not save status for transformer #{status.name}")
          log.warn("Could not save status for transformer #{status.name}")
        end
        position += 1
      end
    end
  end

  if statuses.detect { |s| !s.transform_errors.empty? }
    dispatch_post_etl_hooks(:etl_failed)
    false
  else
    dispatch_post_etl_hooks(:etl_succeeded)
    true
  end
end