Class: ETL::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/etl/engine.rb

Overview

The main ETL engine clas

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.average_rows_per_secondObject

Accessor for the average rows per second processed



114
115
116
# File 'lib/etl/engine.rb', line 114

def average_rows_per_second
  @average_rows_per_second
end

.current_destinationObject

The current destination



76
77
78
# File 'lib/etl/engine.rb', line 76

def current_destination
  @current_destination
end

.current_sourceObject

The current source



70
71
72
# File 'lib/etl/engine.rb', line 70

def current_source
  @current_source
end

.current_source_rowObject

The current source row



73
74
75
# File 'lib/etl/engine.rb', line 73

def current_source_row
  @current_source_row
end

.jobObject

Access the current ETL::Execution::Job instance



95
96
97
# File 'lib/etl/engine.rb', line 95

def job
  @job
end

.limitObject

The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit



100
101
102
# File 'lib/etl/engine.rb', line 100

def limit
  @limit
end

.log_write_modeObject

Accessor for the log write mode. Default is ‘a’ for append.



43
44
45
# File 'lib/etl/engine.rb', line 43

def log_write_mode
  @log_write_mode
end

.loggerObject

:nodoc:



49
50
51
# File 'lib/etl/engine.rb', line 49

def logger
  @logger
end

.offsetObject

The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset



105
106
107
# File 'lib/etl/engine.rb', line 105

def offset
  @offset
end

.read_locallyObject

Set to true to read locally from the last source cache files



111
112
113
# File 'lib/etl/engine.rb', line 111

def read_locally
  @read_locally
end

.realtime_activityObject

Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT



80
81
82
# File 'lib/etl/engine.rb', line 80

def realtime_activity
  @realtime_activity
end

.rows_readObject

Accessor for the total number of rows read from sources



83
84
85
# File 'lib/etl/engine.rb', line 83

def rows_read
  @rows_read
end

.rows_writtenObject

Accessor for the total number of rows processed



89
90
91
# File 'lib/etl/engine.rb', line 89

def rows_written
  @rows_written
end

.skip_bulk_importObject

Set to true to skip all bulk importing



108
109
110
# File 'lib/etl/engine.rb', line 108

def skip_bulk_import
  @skip_bulk_import
end

.timestamped_logObject

Returns the value of attribute timestamped_log.



40
41
42
# File 'lib/etl/engine.rb', line 40

def timestamped_log
  @timestamped_log
end

Class Method Details

.init(options = {}) ⇒ Object

Initialization that is run when a job is executed.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/etl/engine.rb', line 14

def init(options={})
  unless @initialized
    @limit = options[:limit]
    @offset = options[:offset]
    @log_write_mode = 'w' if options[:newlog]
    @skip_bulk_import = options[:skip_bulk_import]
    @read_locally = options[:read_locally]
    options[:config] ||= 'database.yml'
    database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")
    ETL::ActiveRecord::Base.configurations = database_configuration
    ActiveRecord::Base.configurations.merge!(ETL::ActiveRecord::Base.configurations)
    require 'etl/execution'
    ETL::Execution::Base.establish_connection :etl_execution
    ETL::Execution::Execution.migrate
    @initialized = true
  end
end

.process(control_file) ⇒ Object

Process the specified control file. Acceptable values for control_file are

  • Path to a file

  • File object

  • ETL::Control::Control instance



36
37
38
# File 'lib/etl/engine.rb', line 36

def process(control_file)
  new().process(control_file)
end

.timestampObject

Get a timestamp value as a string



65
66
67
# File 'lib/etl/engine.rb', line 65

def timestamp
  Time.now.strftime("%Y%m%d%H%M%S")
end

Instance Method Details

#benchmarksObject

Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:

  • :transforms

  • :after_reads

  • :before_writes

  • :writes



147
148
149
150
151
152
153
154
# File 'lib/etl/engine.rb', line 147

def benchmarks
  @benchmarks ||= {
    :transforms => 0,
    :after_reads => 0,
    :before_writes => 0,
    :writes => 0,
  }
end

#errorsObject

Array of errors encountered during execution of the ETL process



136
137
138
# File 'lib/etl/engine.rb', line 136

def errors
  @errors ||= []
end

#process(control) ⇒ Object

Process a control file or object. Acceptable values for control are:

  • Path to a file

  • File object

  • ETL::Control::Control instance



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/etl/engine.rb', line 160

def process(control)
  control = ETL::Control::Control.resolve(control)
  
  ETL::Engine.job = ETL::Execution::Job.create!(
    :control_file => control.file, 
    :status => 'executing'
  )
  
  execute_dependencies(control)
  
  start_time = Time.now
  
  Engine.logger.debug "Pre-processing #{control.file}"
  pre_process(control)
  Engine.logger.debug "Pre-processing complete"
  
  sources = control.sources
  destinations = control.destinations
  
  say "Skipping bulk import" if Engine.skip_bulk_import
  
  sources.each do |source|
    Engine.current_source = source
    Engine.logger.debug "Processing source #{source}"
    say "Source: #{source}"
    say "Limiting enabled: #{Engine.limit}" if Engine.limit != nil
    say "Offset enabled: #{Engine.offset}" if Engine.offset != nil
    source.each_with_index do |row, index|
      # Break out of the row loop if the +Engine.limit+ is specified and 
      # the number of rows read exceeds that value.
      if Engine.limit != nil && Engine.rows_read >= Engine.limit
        puts "Reached limit of #{Engine.limit}"
        break
      end
      
      Engine.logger.debug "Row #{index}: #{row.inspect}"
      Engine.rows_read += 1
      Engine.current_source_row = index + 1
      if Engine.realtime_activity && index > 0 && index % 1000 == 0
        say_without_newline "."
      end
      
      # At this point a single row may be turned into multiple rows via row 
      # processors all code after this line should work with the array of 
      # rows rather than the single row
      rows = [row]
      
      t = Benchmark.realtime do
        begin
          Engine.logger.debug "Processing after read"
          control.after_read_processors.each do |processor|
            processed_rows = []
            rows.each do |row|
              processed_rows << processor.process(row)
            end
            rows = processed_rows.flatten
          end
        rescue => e
          msg = "Error processing rows after read from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
          errors << msg
          Engine.logger.error(msg)
          exceeded_error_threshold?(control) ? break : next
        end
      end
      benchmarks[:after_reads] += t unless t.nil?
      
      t = Benchmark.realtime do
        begin
          # execute transforms
          Engine.logger.debug "Executing transforms"
          rows.each do |row|
            control.transforms.each do |transform|
              name = transform.name.to_sym
              row[name] = transform.transform(name, row[name], row)
            end
          end
        rescue => e
          msg = "Error transforming from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
          errors << msg
          Engine.logger.error(msg)
          e.backtrace.each { |line| Engine.logger.error(line) }
          begin
            exceeded_error_threshold?(control) ? break : next
          rescue => inner_error
            puts inner_error
          end
        end
      end
      benchmarks[:transforms] += t unless t.nil?
      
      t = Benchmark.realtime do
        begin
          # execute row-level "before write" processing
          Engine.logger.debug "Processing before write"
          control.before_write_processors.each do |processor|
            processed_rows = []
            rows.each do |row|
              processed_rows << processor.process(row)
            end
            rows = processed_rows.flatten.compact
          end
        rescue => e
          msg = "Error processing rows before write from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}"
          errors << msg
          Engine.logger.error(msg)
          e.backtrace.each { |line| Engine.logger.error(line) }
          exceeded_error_threshold?(control) ? break : next
        end
      end
      benchmarks[:before_writes] += t unless t.nil?
      
      t = Benchmark.realtime do
        begin
          # write the row to the destination
          destinations.each_with_index do |destination, index|
            Engine.current_destination = destination
            rows.each do |row|
              destination.write(row)
              Engine.rows_written += 1 if index == 0
            end
          end
        rescue => e
          msg = "Error writing to #{Engine.current_destination}: #{e}"
          errors << msg
          Engine.logger.error msg
          e.backtrace.each { |line| Engine.logger.error(line) }
          exceeded_error_threshold?(control) ? break : next
        end
      end
      benchmarks[:writes] += t unless t.nil?
    end
    
    if exceeded_error_threshold?(control)
      say_on_own_line "Exiting due to exceeding error threshold: #{control.error_threshold}"
      return
    end
    
  end
  
  destinations.each do |destination|
    destination.close
  end
  
  say_on_own_line "Executing post processes"
  Engine.logger.debug "Post-processing #{control.file}"
  post_process(control)
  Engine.logger.debug "Post-processing complete"
  say "Post-processing complete"
  
  if sources.length > 0
    say_on_own_line "Read #{Engine.rows_read} lines from sources"
  end
  if destinations.length > 0
    say "Wrote #{Engine.rows_written} lines to destinations"
  end
  say "Completed #{control.file} in #{distance_of_time_in_words(start_time)} with #{errors.length} errors."
  say "Processing average: #{Engine.average_rows_per_second} rows/sec)"
  
  say "Avg after_reads: #{Engine.rows_read/benchmarks[:after_reads]} rows/sec" if benchmarks[:after_reads] > 0
  say "Avg before_writes: #{Engine.rows_read/benchmarks[:before_writes]} rows/sec" if benchmarks[:before_writes] > 0
  say "Avg transforms: #{Engine.rows_read/benchmarks[:transforms]} rows/sec" if benchmarks[:transforms] > 0
  say "Avg writes: #{Engine.rows_read/benchmarks[:writes]} rows/sec" if benchmarks[:writes] > 0

  say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}"
  
  # ETL::Transform::Transform.benchmarks.each do |klass, t|
#         say "Avg #{klass}: #{Engine.rows_read/t} rows/sec"
#       end
  
  ETL::Engine.job.completed_at = Time.now
  ETL::Engine.job.status = (errors.length > 0 ? 'completed with errors' : 'completed')
  ETL::Engine.job.save!
end

#say(message) ⇒ Object

Say the specified message, with a newline



118
119
120
# File 'lib/etl/engine.rb', line 118

def say(message)
  say_without_newline(message + "\n")
end

#say_on_own_line(message) ⇒ Object

Say the message on its own line



131
132
133
# File 'lib/etl/engine.rb', line 131

def say_on_own_line(message)
  say("\n" + message)
end

#say_without_newline(message) ⇒ Object

Say the specified message without a newline



123
124
125
126
127
128
# File 'lib/etl/engine.rb', line 123

def say_without_newline(message)
  if Engine.realtime_activity
    $stdout.print message
    $stdout.flush
  end
end