Class: ETL::Engine
- Inherits:
-
Object
- Object
- ETL::Engine
- Defined in:
- lib/etl/engine.rb
Overview
The main ETL engine clas
Class Attribute Summary collapse
-
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed.
-
.current_destination ⇒ Object
The current destination.
-
.current_source ⇒ Object
The current source.
-
.current_source_row ⇒ Object
The current source row.
-
.job ⇒ Object
Access the current ETL::Execution::Job instance.
-
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch.
-
.log_write_mode ⇒ Object
Accessor for the log write mode.
-
.logger ⇒ Object
:nodoc:.
-
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch.
-
.read_locally ⇒ Object
Set to true to read locally from the last source cache files.
-
.realtime_activity ⇒ Object
Set to true to activate realtime activity.
-
.rows_read ⇒ Object
Accessor for the total number of rows read from sources.
-
.rows_written ⇒ Object
Accessor for the total number of rows processed.
-
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing.
-
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
Class Method Summary collapse
-
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
-
.process(control_file) ⇒ Object
Process the specified control file.
-
.timestamp ⇒ Object
Get a timestamp value as a string.
Instance Method Summary collapse
-
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline.
-
#errors ⇒ Object
Array of errors encountered during execution of the ETL process.
-
#process(control) ⇒ Object
Process a control file or object.
-
#say(message) ⇒ Object
Say the specified message, with a newline.
-
#say_on_own_line(message) ⇒ Object
Say the message on its own line.
-
#say_without_newline(message) ⇒ Object
Say the specified message without a newline.
Class Attribute Details
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed
114 115 116 |
# File 'lib/etl/engine.rb', line 114 def average_rows_per_second @average_rows_per_second end |
.current_destination ⇒ Object
The current destination
76 77 78 |
# File 'lib/etl/engine.rb', line 76 def current_destination @current_destination end |
.current_source ⇒ Object
The current source
70 71 72 |
# File 'lib/etl/engine.rb', line 70 def current_source @current_source end |
.current_source_row ⇒ Object
The current source row
73 74 75 |
# File 'lib/etl/engine.rb', line 73 def current_source_row @current_source_row end |
.job ⇒ Object
Access the current ETL::Execution::Job instance
95 96 97 |
# File 'lib/etl/engine.rb', line 95 def job @job end |
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit
100 101 102 |
# File 'lib/etl/engine.rb', line 100 def limit @limit end |
.log_write_mode ⇒ Object
Accessor for the log write mode. Default is ‘a’ for append.
43 44 45 |
# File 'lib/etl/engine.rb', line 43 def log_write_mode @log_write_mode end |
.logger ⇒ Object
:nodoc:
49 50 51 |
# File 'lib/etl/engine.rb', line 49 def logger @logger end |
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset
105 106 107 |
# File 'lib/etl/engine.rb', line 105 def offset @offset end |
.read_locally ⇒ Object
Set to true to read locally from the last source cache files
111 112 113 |
# File 'lib/etl/engine.rb', line 111 def read_locally @read_locally end |
.realtime_activity ⇒ Object
Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT
80 81 82 |
# File 'lib/etl/engine.rb', line 80 def realtime_activity @realtime_activity end |
.rows_read ⇒ Object
Accessor for the total number of rows read from sources
83 84 85 |
# File 'lib/etl/engine.rb', line 83 def rows_read @rows_read end |
.rows_written ⇒ Object
Accessor for the total number of rows processed
89 90 91 |
# File 'lib/etl/engine.rb', line 89 def rows_written @rows_written end |
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing
108 109 110 |
# File 'lib/etl/engine.rb', line 108 def skip_bulk_import @skip_bulk_import end |
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
40 41 42 |
# File 'lib/etl/engine.rb', line 40 def @timestamped_log end |
Class Method Details
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/etl/engine.rb', line 14 def init(={}) unless @initialized @limit = [:limit] @offset = [:offset] @log_write_mode = 'w' if [:newlog] @skip_bulk_import = [:skip_bulk_import] @read_locally = [:read_locally] [:config] ||= 'database.yml' database_configuration = YAML::load(ERB.new(IO.read([:config])).result + "\n") ETL::ActiveRecord::Base.configurations = database_configuration ActiveRecord::Base.configurations.merge!(ETL::ActiveRecord::Base.configurations) require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end |
.process(control_file) ⇒ Object
Process the specified control file. Acceptable values for control_file are
-
Path to a file
-
File object
-
ETL::Control::Control instance
36 37 38 |
# File 'lib/etl/engine.rb', line 36 def process(control_file) new().process(control_file) end |
.timestamp ⇒ Object
Get a timestamp value as a string
65 66 67 |
# File 'lib/etl/engine.rb', line 65 def Time.now.strftime("%Y%m%d%H%M%S") end |
Instance Method Details
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:
-
:transforms
-
:after_reads
-
:before_writes
-
:writes
147 148 149 150 151 152 153 154 |
# File 'lib/etl/engine.rb', line 147 def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end |
#errors ⇒ Object
Array of errors encountered during execution of the ETL process
136 137 138 |
# File 'lib/etl/engine.rb', line 136 def errors @errors ||= [] end |
#process(control) ⇒ Object
Process a control file or object. Acceptable values for control are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/etl/engine.rb', line 160 def process(control) control = ETL::Control::Control.resolve(control) ETL::Engine.job = ETL::Execution::Job.create!( :control_file => control.file, :status => 'executing' ) execute_dependencies(control) start_time = Time.now Engine.logger.debug "Pre-processing #{control.file}" pre_process(control) Engine.logger.debug "Pre-processing complete" sources = control.sources destinations = control.destinations say "Skipping bulk import" if Engine.skip_bulk_import sources.each do |source| Engine.current_source = source Engine.logger.debug "Processing source #{source}" say "Source: #{source}" say "Limiting enabled: #{Engine.limit}" if Engine.limit != nil say "Offset enabled: #{Engine.offset}" if Engine.offset != nil source.each_with_index do |row, index| # Break out of the row loop if the +Engine.limit+ is specified and # the number of rows read exceeds that value. if Engine.limit != nil && Engine.rows_read >= Engine.limit puts "Reached limit of #{Engine.limit}" break end Engine.logger.debug "Row #{index}: #{row.inspect}" Engine.rows_read += 1 Engine.current_source_row = index + 1 if Engine.realtime_activity && index > 0 && index % 1000 == 0 say_without_newline "." end # At this point a single row may be turned into multiple rows via row # processors all code after this line should work with the array of # rows rather than the single row rows = [row] t = Benchmark.realtime do begin Engine.logger.debug "Processing after read" control.after_read_processors.each do |processor| processed_rows = [] rows.each do |row| processed_rows << processor.process(row) end rows = processed_rows.flatten end rescue => e msg = "Error processing rows after read from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) exceeded_error_threshold?(control) ? break : next end end benchmarks[:after_reads] += t unless t.nil? t = Benchmark.realtime do begin # execute transforms Engine.logger.debug "Executing transforms" rows.each do |row| control.transforms.each do |transform| name = transform.name.to_sym row[name] = transform.transform(name, row[name], row) end end rescue => e msg = "Error transforming from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) e.backtrace.each { |line| Engine.logger.error(line) } begin exceeded_error_threshold?(control) ? break : next rescue => inner_error puts inner_error end end end benchmarks[:transforms] += t unless t.nil? t = Benchmark.realtime do begin # execute row-level "before write" processing Engine.logger.debug "Processing before write" control.before_write_processors.each do |processor| processed_rows = [] rows.each do |row| processed_rows << processor.process(row) end rows = processed_rows.flatten.compact end rescue => e msg = "Error processing rows before write from #{Engine.current_source} on line #{Engine.current_source_row}: #{e}" errors << msg Engine.logger.error(msg) e.backtrace.each { |line| Engine.logger.error(line) } exceeded_error_threshold?(control) ? break : next end end benchmarks[:before_writes] += t unless t.nil? t = Benchmark.realtime do begin # write the row to the destination destinations.each_with_index do |destination, index| Engine.current_destination = destination rows.each do |row| destination.write(row) Engine.rows_written += 1 if index == 0 end end rescue => e msg = "Error writing to #{Engine.current_destination}: #{e}" errors << msg Engine.logger.error msg e.backtrace.each { |line| Engine.logger.error(line) } exceeded_error_threshold?(control) ? break : next end end benchmarks[:writes] += t unless t.nil? end if exceeded_error_threshold?(control) say_on_own_line "Exiting due to exceeding error threshold: #{control.error_threshold}" return end end destinations.each do |destination| destination.close end say_on_own_line "Executing post processes" Engine.logger.debug "Post-processing #{control.file}" post_process(control) Engine.logger.debug "Post-processing complete" say "Post-processing complete" if sources.length > 0 say_on_own_line "Read #{Engine.rows_read} lines from sources" end if destinations.length > 0 say "Wrote #{Engine.rows_written} lines to destinations" end say "Completed #{control.file} in #{distance_of_time_in_words(start_time)} with #{errors.length} errors." say "Processing average: #{Engine.average_rows_per_second} rows/sec)" say "Avg after_reads: #{Engine.rows_read/benchmarks[:after_reads]} rows/sec" if benchmarks[:after_reads] > 0 say "Avg before_writes: #{Engine.rows_read/benchmarks[:before_writes]} rows/sec" if benchmarks[:before_writes] > 0 say "Avg transforms: #{Engine.rows_read/benchmarks[:transforms]} rows/sec" if benchmarks[:transforms] > 0 say "Avg writes: #{Engine.rows_read/benchmarks[:writes]} rows/sec" if benchmarks[:writes] > 0 say "Avg time writing execution records: #{ETL::Execution::Record.average_time_spent}" # ETL::Transform::Transform.benchmarks.each do |klass, t| # say "Avg #{klass}: #{Engine.rows_read/t} rows/sec" # end ETL::Engine.job.completed_at = Time.now ETL::Engine.job.status = (errors.length > 0 ? 'completed with errors' : 'completed') ETL::Engine.job.save! end |
#say(message) ⇒ Object
Say the specified message, with a newline
118 119 120 |
# File 'lib/etl/engine.rb', line 118 def say() say_without_newline( + "\n") end |
#say_on_own_line(message) ⇒ Object
Say the message on its own line
131 132 133 |
# File 'lib/etl/engine.rb', line 131 def say_on_own_line() say("\n" + ) end |
#say_without_newline(message) ⇒ Object
Say the specified message without a newline
123 124 125 126 127 128 |
# File 'lib/etl/engine.rb', line 123 def say_without_newline() if Engine.realtime_activity $stdout.print $stdout.flush end end |