Class: ETL

Inherits:
Object
  • Object
show all
Includes:
ActiveSupport::Callbacks, TeguGears
Defined in:
lib/etl/etl.rb

Overview

State machine with useful callbacks for getting data (Extract, Transform, and Loading data) with some support for re-trying failed stages of the process. Raise errors liberally if things go wrong, the data is being staged and the process can usually be restarted once the issue has been addressed.

Direct Known Subclasses

ActiveRecordLoader, CSV::ET, XML::ET

Constant Summary collapse

VALID_STATES =
[:before_extract, :extract, :after_extract, :before_transform, :transform, :after_transform, :before_load, :load, :after_load, :complete].freeze
VALID_CALLBACKS =
[:before_extract, :after_extract, :before_transform, :after_transform, :before_load, :after_load, :complete].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeETL

Returns a new instance of ETL.



109
110
111
# File 'lib/etl/etl.rb', line 109

def initialize
  @state = :before_extract
end

Instance Attribute Details

#blockObject (readonly)

An optional block to process with



134
135
136
# File 'lib/etl/etl.rb', line 134

def block
  @block
end

#dataObject (readonly)

The data being worked on, after it has successfully completed an extract, transform, or load process.



118
119
120
# File 'lib/etl/etl.rb', line 118

def data
  @data
end

#optionsObject (readonly)

The options to process with. All your code will have access to these options, so things like:

:filename => ‘…’, :destination => ‘…’, :converters => :all

would all be useful. Your extract, transform, and load methods plus your callbacks can then extract out the information they need to get the job done.



131
132
133
# File 'lib/etl/etl.rb', line 131

def options
  @options
end

#rawObject (readonly)

The data generated on a process that didn’t complete.



121
122
123
# File 'lib/etl/etl.rb', line 121

def raw
  @raw
end

#stateObject (readonly)

The state of the transform process



114
115
116
# File 'lib/etl/etl.rb', line 114

def state
  @state
end

Class Method Details

.callObject



27
28
29
30
31
# File 'lib/etl/etl.rb', line 27

def process(options={}, &block)
  etl = new
  etl.process(options, &block)
  etl
end

.loggerObject

Sets up a logger for the class. Respects inheritance, so a different logger will be created for each ETL subclass. Using the standard log levels here: DEBUG < INFO < WARN < ERROR < FATAL



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/etl/etl.rb', line 32

def logger
  logger_name = (self.to_s + "_logger").to_sym
  
  # Find and return the cached logger, if it's setup
  logger = read_inheritable_attribute(logger_name)
  return logger if logger
  
  # Create a logger.  Will configure it here and save it in a moment.
  logger = Log4r::Logger.new(self.to_s)
  
  # Set my default output format
  format = Log4r::PatternFormatter.new(:pattern => "[%l] %d :: %m")
  
  # Setup a console logger with our formatting
  console = Log4r::StderrOutputter.new 'console'
  console.level = Log4r::WARN
  console.formatter = format
  
  # Setup a logger to a file with our formatting
  logfile = Log4r::FileOutputter.new('logfile', 
                           :filename => self.logger_filename, 
                           :trunc => false,
                           :level => Log4r::DEBUG)
  logfile.formatter = format
  
  # Tell the logger about both outputs.
  logger.add('console','logfile')
  
  # Store the logger as an inheritable class attribute
  write_inheritable_attribute(logger_name, logger)
  
  # Return the logger
  logger
end

.logger_filenameObject



101
102
103
# File 'lib/etl/etl.rb', line 101

def logger_filename
  File.join(self.logger_root, "#{self.to_s}.log")
end

.logger_rootObject

First tries to get the cached @@logger_root Second, sets the global @@logger_root unless it is cached. Sets it to the best possible place to locate the logs: 1) where log will be from RAILS_ROOT/vendor/gems/etl 2) where log will be in a Rails model 3) where log will be in a Rails lib 4) in the local directory where ETL is being subclassed Third, uses the subclasses stored logger_root, ignoring all the rest if this is found.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/etl/etl.rb', line 76

def logger_root
  @@logger_root ||= case
  when File.exist?(File.dirname(__FILE__) + "/../../../../../log")
    File.expand_path(File.dirname(__FILE__) + "/../../../../../log")
  when File.exist?(File.dirname(__FILE__) + "/../../log")
    File.expand_path(File.dirname(__FILE__) + '/../../log')
  when File.exist?(File.dirname(__FILE__) + "/../log")
    File.expand_path(File.dirname(__FILE__) + '/../log')
  when File.exist?(File.dirname(__FILE__) + "/log")
    File.expand_path(File.dirname(__FILE__) + '/log')
  else
    File.expand_path('.')
  end
  logger_root = read_inheritable_attribute(:logger_root) || @@logger_root
end

.logger_root=(value) ⇒ Object

Sets the logger root for the subclass, and sets it globally if this is set on ETL. So, ETL.logger_root = “some location” sets the logger root for all subclasses. This is useful if a lot of ETL is being done, and it needs to be logged in a non-standard place.



96
97
98
99
# File 'lib/etl/etl.rb', line 96

def logger_root=(value)
  write_inheritable_attribute(:logger_root, value)
  @@logger_root = value if self == ETL
end

.process(options = {}, &block) ⇒ Object



22
23
24
25
26
# File 'lib/etl/etl.rb', line 22

def process(options={}, &block)
  etl = new
  etl.process(options, &block)
  etl
end

Instance Method Details

#process(options = {}, &block) ⇒ Object

Working towards a universal workflow driver here. The signature is just a hash and a block. That should work for about anything.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/etl/etl.rb', line 138

def process(options={}, &block)
  # Only setup the options the first time, the other times we are re-
  # starting the process. 
  @options = options unless @options
  @block = block

  self.class.logger.info "Processing #{self.class.to_s}"
  self.class.logger.info "To re-run this process, run: #{self.show_command}"
  self.class.logger.info "Note: Also pass the same block to #{self.class.to_s}" if block

  etl_callback(:before_extract)

  if @state == :extract
    extract 
    @state = :after_extract
  end

  etl_callback(:after_extract)

  # To be sure this is after all after_extract callbacks
  process_raw_data
  
  etl_callback(:before_transform)

  if @state == :transform
    transform
    @state = :after_transform
  end

  etl_callback(:after_transform)
  
  # To be sure this is after all after_tranform callbacks
  process_raw_data
  
  etl_callback(:before_load)

  if @state == :load
    load
    @state = :after_load
  end

  etl_callback(:after_load)
  @state
end

#reverse_to(state) ⇒ Object

Raises:

  • (ArgumentError)


183
184
185
186
187
188
189
# File 'lib/etl/etl.rb', line 183

def reverse_to(state)
  raise ArgumentError, "State must be one of #{VALID_STATES.inspect}" unless VALID_STATES.include?(state)
  loc = VALID_STATES.index(state)
  possible_states = VALID_STATES[0..loc]
  raise "Cannot reverse to a state that hasn't been acheived yet." unless possible_states.include?(state)
  @state = state
end