Class: ETL::Engine
Overview
The main ETL engine clas
Class Attribute Summary collapse
-
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed.
-
.batch ⇒ Object
Access the current ETL::Execution::Batch instance.
-
.current_destination ⇒ Object
The current destination.
-
.current_source ⇒ Object
The current source.
-
.current_source_row ⇒ Object
The current source row.
-
.exit_code ⇒ Object
exit code to be passed to the command line.
-
.job ⇒ Object
Access the current ETL::Execution::Job instance.
-
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch.
-
.log_write_mode ⇒ Object
Accessor for the log write mode.
-
.logger ⇒ Object
:nodoc:.
-
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch.
-
.read_locally ⇒ Object
Set to true to read locally from the last source cache files.
-
.realtime_activity ⇒ Object
Set to true to activate realtime activity.
-
.rows_read ⇒ Object
Accessor for the total number of rows read from sources.
-
.rows_written ⇒ Object
Accessor for the total number of rows processed.
-
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing.
-
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
-
.use_temp_tables ⇒ Object
Set to true to use temp tables.
Class Method Summary collapse
-
.connection(name) ⇒ Object
Get a named connection.
-
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur.
-
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
-
.process(file) ⇒ Object
Process the specified file.
-
.table(table_name, connection) ⇒ Object
Modify the table name if necessary.
-
.temp_tables ⇒ Object
Get a registry of temp tables.
-
.timestamp ⇒ Object
Get a timestamp value as a string.
-
.use_temp_tables? ⇒ Boolean
Return true if using temp tables.
Instance Method Summary collapse
-
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline.
-
#errors ⇒ Object
Array of errors encountered during execution of the ETL process.
-
#process(file) ⇒ Object
Process a file, control object or batch object.
-
#say(message) ⇒ Object
Say the specified message, with a newline.
-
#say_on_own_line(message) ⇒ Object
Say the message on its own line.
-
#say_without_newline(message) ⇒ Object
Say the specified message without a newline.
-
#track_error(control, msg) ⇒ Object
First attempt at centralizing error notifications.
Methods included from Util
#approximate_distance_of_time_in_words, #distance_of_time_in_words
Class Attribute Details
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed
138 139 140 |
# File 'lib/etl/engine.rb', line 138 def average_rows_per_second @average_rows_per_second end |
.batch ⇒ Object
Access the current ETL::Execution::Batch instance
119 120 121 |
# File 'lib/etl/engine.rb', line 119 def batch @batch end |
.current_destination ⇒ Object
The current destination
97 98 99 |
# File 'lib/etl/engine.rb', line 97 def current_destination @current_destination end |
.current_source ⇒ Object
The current source
91 92 93 |
# File 'lib/etl/engine.rb', line 91 def current_source @current_source end |
.current_source_row ⇒ Object
The current source row
94 95 96 |
# File 'lib/etl/engine.rb', line 94 def current_source_row @current_source_row end |
.exit_code ⇒ Object
exit code to be passed to the command line
88 89 90 |
# File 'lib/etl/engine.rb', line 88 def exit_code @exit_code end |
.job ⇒ Object
Access the current ETL::Execution::Job instance
116 117 118 |
# File 'lib/etl/engine.rb', line 116 def job @job end |
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit
124 125 126 |
# File 'lib/etl/engine.rb', line 124 def limit @limit end |
.log_write_mode ⇒ Object
Accessor for the log write mode. Default is ‘a’ for append.
61 62 63 |
# File 'lib/etl/engine.rb', line 61 def log_write_mode @log_write_mode end |
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset
129 130 131 |
# File 'lib/etl/engine.rb', line 129 def offset @offset end |
.read_locally ⇒ Object
Set to true to read locally from the last source cache files
135 136 137 |
# File 'lib/etl/engine.rb', line 135 def read_locally @read_locally end |
.realtime_activity ⇒ Object
Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT
101 102 103 |
# File 'lib/etl/engine.rb', line 101 def realtime_activity @realtime_activity end |
.rows_read ⇒ Object
Accessor for the total number of rows read from sources
104 105 106 |
# File 'lib/etl/engine.rb', line 104 def rows_read @rows_read end |
.rows_written ⇒ Object
Accessor for the total number of rows processed
110 111 112 |
# File 'lib/etl/engine.rb', line 110 def rows_written @rows_written end |
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing
132 133 134 |
# File 'lib/etl/engine.rb', line 132 def skip_bulk_import @skip_bulk_import end |
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
58 59 60 |
# File 'lib/etl/engine.rb', line 58 def @timestamped_log end |
.use_temp_tables ⇒ Object
Set to true to use temp tables
150 151 152 |
# File 'lib/etl/engine.rb', line 150 def use_temp_tables @use_temp_tables end |
Class Method Details
.connection(name) ⇒ Object
Get a named connection
141 142 143 144 145 146 147 |
# File 'lib/etl/engine.rb', line 141 def connection(name) logger.debug "Retrieving connection #{name}" conn = connections[name] ||= establish_connection(name) #conn.verify!(ActiveRecord::Base.verification_timeout) conn.reconnect! unless conn.active? conn end |
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur
158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/etl/engine.rb', line 158 def finish temp_tables.each do |temp_table, mapping| actual_table = mapping[:table] #puts "move #{temp_table} to #{actual_table}" conn = mapping[:connection] conn.transaction do conn.rename_table(actual_table, "#{actual_table}_old") conn.rename_table(temp_table, actual_table) conn.drop_table("#{actual_table}_old") end end end |
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
Options:
-
:limit
: Limit the number of records returned from sources -
:offset
: Specify the records for data from sources -
:log_write_mode
: If true then the log will write, otherwise it will append -
:skip_bulk_import
: Set to true to skip bulk import -
:read_locally
: Set to true to read from the local cache -
:rails_root
: Set to the rails root to boot rails
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/etl/engine.rb', line 20 def init(={}) unless @initialized puts "initializing ETL engine\n\n" @limit = [:limit] @offset = [:offset] @log_write_mode = 'w' if [:newlog] @skip_bulk_import = [:skip_bulk_import] @read_locally = [:read_locally] @rails_root = [:rails_root] require File.join(@rails_root, 'config/environment') if @rails_root [:config] ||= 'database.yml' [:config] = 'config/database.yml' unless File.exist?([:config]) database_configuration = YAML::load(ERB.new(IO.read([:config])).result + "\n") ActiveRecord::Base.configurations.merge!(database_configuration) ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration) #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}" require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end |
.process(file) ⇒ Object
Process the specified file. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
The process command will accept either a .ctl or .ctl.rb for a Control file or a .ebf
or .ebf.rb for an ETL Batch File.
54 55 56 |
# File 'lib/etl/engine.rb', line 54 def process(file) new().process(file) end |
.table(table_name, connection) ⇒ Object
Modify the table name if necessary
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/etl/engine.rb', line 177 def table(table_name, connection) if use_temp_tables? temp_table_name = "tmp_#{table_name}" if temp_tables[temp_table_name].nil? # Create the temp table and add it to the mapping begin connection.drop_table(temp_table_name); rescue; end connection.copy_table(table_name, temp_table_name) temp_tables[temp_table_name] = { :table => table_name, :connection => connection } end temp_table_name else table_name end end |
.temp_tables ⇒ Object
Get a registry of temp tables
153 154 155 |
# File 'lib/etl/engine.rb', line 153 def temp_tables @temp_tables ||= {} end |
.timestamp ⇒ Object
Get a timestamp value as a string
83 84 85 |
# File 'lib/etl/engine.rb', line 83 def Time.now.strftime("%Y%m%d%H%M%S") end |
.use_temp_tables? ⇒ Boolean
Return true if using temp tables
172 173 174 |
# File 'lib/etl/engine.rb', line 172 def use_temp_tables? use_temp_tables ? true : false end |
Instance Method Details
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:
-
:transforms
-
:after_reads
-
:before_writes
-
:writes
254 255 256 257 258 259 260 261 |
# File 'lib/etl/engine.rb', line 254 def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end |
#errors ⇒ Object
Array of errors encountered during execution of the ETL process
235 236 237 |
# File 'lib/etl/engine.rb', line 235 def errors @errors ||= [] end |
#process(file) ⇒ Object
Process a file, control object or batch object. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/etl/engine.rb', line 269 def process(file) case file when String process(File.new(file)) when File case file.path when /\.ctl(\.rb)?$/; process_control(file) when /\.ebf(\.rb)?$/; process_batch(file) else raise RuntimeError, "Unsupported file type - #{file.path}" end when ETL::Control::Control process_control(file) when ETL::Batch::Batch process_batch(file) else raise RuntimeError, "Process object must be a String, File, Control instance or Batch instance" end end |
#say(message) ⇒ Object
Say the specified message, with a newline
217 218 219 |
# File 'lib/etl/engine.rb', line 217 def say() say_without_newline( + "\n") end |
#say_on_own_line(message) ⇒ Object
Say the message on its own line
230 231 232 |
# File 'lib/etl/engine.rb', line 230 def say_on_own_line() say("\n" + ) end |
#say_without_newline(message) ⇒ Object
Say the specified message without a newline
222 223 224 225 226 227 |
# File 'lib/etl/engine.rb', line 222 def say_without_newline() if ETL::Engine.realtime_activity $stdout.print $stdout.flush end end |
#track_error(control, msg) ⇒ Object
First attempt at centralizing error notifications
240 241 242 243 244 245 |
# File 'lib/etl/engine.rb', line 240 def track_error(control, msg) errors << msg control.error_handlers.each do |handler| handler.call(msg) end end |