Class: ETL::Engine
Overview
The main ETL engine clas
Class Attribute Summary collapse
-
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed.
-
.batch ⇒ Object
Access the current ETL::Execution::Batch instance.
-
.current_destination ⇒ Object
The current destination.
-
.current_source ⇒ Object
The current source.
-
.current_source_row ⇒ Object
The current source row.
-
.job ⇒ Object
Access the current ETL::Execution::Job instance.
-
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch.
-
.log_write_mode ⇒ Object
Accessor for the log write mode.
-
.logger ⇒ Object
:nodoc:.
-
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch.
-
.read_locally ⇒ Object
Set to true to read locally from the last source cache files.
-
.realtime_activity ⇒ Object
Set to true to activate realtime activity.
-
.rows_read ⇒ Object
Accessor for the total number of rows read from sources.
-
.rows_written ⇒ Object
Accessor for the total number of rows processed.
-
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing.
-
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
-
.use_temp_tables ⇒ Object
Set to true to use temp tables.
Class Method Summary collapse
-
.connection(name) ⇒ Object
Get a named connection.
-
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur.
-
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
-
.process(file) ⇒ Object
Process the specified file.
-
.table(table_name, connection) ⇒ Object
Modify the table name if necessary.
-
.temp_tables ⇒ Object
Get a registry of temp tables.
-
.timestamp ⇒ Object
Get a timestamp value as a string.
-
.use_temp_tables? ⇒ Boolean
Return true if using temp tables.
Instance Method Summary collapse
-
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline.
-
#errors ⇒ Object
Array of errors encountered during execution of the ETL process.
-
#process(file) ⇒ Object
Process a file, control object or batch object.
-
#say(message) ⇒ Object
Say the specified message, with a newline.
-
#say_on_own_line(message) ⇒ Object
Say the message on its own line.
-
#say_without_newline(message) ⇒ Object
Say the specified message without a newline.
Methods included from Util
#approximate_distance_of_time_in_words, #distance_of_time_in_words
Class Attribute Details
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed
140 141 142 |
# File 'lib/etl/engine.rb', line 140 def average_rows_per_second @average_rows_per_second end |
.batch ⇒ Object
Access the current ETL::Execution::Batch instance
121 122 123 |
# File 'lib/etl/engine.rb', line 121 def batch @batch end |
.current_destination ⇒ Object
The current destination
99 100 101 |
# File 'lib/etl/engine.rb', line 99 def current_destination @current_destination end |
.current_source ⇒ Object
The current source
93 94 95 |
# File 'lib/etl/engine.rb', line 93 def current_source @current_source end |
.current_source_row ⇒ Object
The current source row
96 97 98 |
# File 'lib/etl/engine.rb', line 96 def current_source_row @current_source_row end |
.job ⇒ Object
Access the current ETL::Execution::Job instance
118 119 120 |
# File 'lib/etl/engine.rb', line 118 def job @job end |
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit
126 127 128 |
# File 'lib/etl/engine.rb', line 126 def limit @limit end |
.log_write_mode ⇒ Object
Accessor for the log write mode. Default is ‘a’ for append.
62 63 64 |
# File 'lib/etl/engine.rb', line 62 def log_write_mode @log_write_mode end |
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset
131 132 133 |
# File 'lib/etl/engine.rb', line 131 def offset @offset end |
.read_locally ⇒ Object
Set to true to read locally from the last source cache files
137 138 139 |
# File 'lib/etl/engine.rb', line 137 def read_locally @read_locally end |
.realtime_activity ⇒ Object
Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT
103 104 105 |
# File 'lib/etl/engine.rb', line 103 def realtime_activity @realtime_activity end |
.rows_read ⇒ Object
Accessor for the total number of rows read from sources
106 107 108 |
# File 'lib/etl/engine.rb', line 106 def rows_read @rows_read end |
.rows_written ⇒ Object
Accessor for the total number of rows processed
112 113 114 |
# File 'lib/etl/engine.rb', line 112 def rows_written @rows_written end |
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing
134 135 136 |
# File 'lib/etl/engine.rb', line 134 def skip_bulk_import @skip_bulk_import end |
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
59 60 61 |
# File 'lib/etl/engine.rb', line 59 def @timestamped_log end |
.use_temp_tables ⇒ Object
Set to true to use temp tables
152 153 154 |
# File 'lib/etl/engine.rb', line 152 def use_temp_tables @use_temp_tables end |
Class Method Details
.connection(name) ⇒ Object
Get a named connection
143 144 145 146 147 148 149 |
# File 'lib/etl/engine.rb', line 143 def connection(name) logger.debug "Retrieving connection #{name}" conn = connections[name] ||= establish_connection(name) #conn.verify!(ActiveRecord::Base.verification_timeout) conn.reconnect! unless conn.active? conn end |
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur
160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/etl/engine.rb', line 160 def finish temp_tables.each do |temp_table, mapping| actual_table = mapping[:table] #puts "move #{temp_table} to #{actual_table}" conn = mapping[:connection] conn.transaction do conn.rename_table(actual_table, "#{actual_table}_old") conn.rename_table(temp_table, actual_table) conn.drop_table("#{actual_table}_old") end end end |
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
Options:
-
:limit
: Limit the number of records returned from sources -
:offset
: Specify the records for data from sources -
:log_write_mode
: If true then the log will write, otherwise it will append -
:skip_bulk_import
: Set to true to skip bulk import -
:read_locally
: Set to true to read from the local cache -
:rails_root
: Set to the rails root to boot rails
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/etl/engine.rb', line 20 def init(={}) unless @initialized puts "initializing ETL engine\n\n" @limit = [:limit] @offset = [:offset] @log_write_mode = 'w' if [:newlog] @skip_bulk_import = [:skip_bulk_import] @read_locally = [:read_locally] @rails_root = [:rails_root] @log_dir = [:log_dir] require File.join(@rails_root, 'config/environment') if @rails_root [:config] ||= 'database.yml' [:config] = 'config/database.yml' unless File.exist?([:config]) database_configuration = YAML::load(ERB.new(IO.read([:config])).result + "\n") ActiveRecord::Base.configurations.merge!(database_configuration) ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration) #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}" require 'etl/execution' ETL::Execution::Base.establish_connection([:execution_conf] || :etl_execution) ETL::Execution::Execution.migrate @initialized = true end end |
.process(file) ⇒ Object
Process the specified file. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
The process command will accept either a .ctl Control file or a .ebf ETL Batch File.
55 56 57 |
# File 'lib/etl/engine.rb', line 55 def process(file) new().process(file) end |
.table(table_name, connection) ⇒ Object
Modify the table name if necessary
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/etl/engine.rb', line 179 def table(table_name, connection) if use_temp_tables? temp_table_name = "tmp_#{table_name}" if temp_tables[temp_table_name].nil? # Create the temp table and add it to the mapping begin connection.drop_table(temp_table_name); rescue; end connection.copy_table(table_name, temp_table_name) temp_tables[temp_table_name] = { :table => table_name, :connection => connection } end temp_table_name else table_name end end |
.temp_tables ⇒ Object
Get a registry of temp tables
155 156 157 |
# File 'lib/etl/engine.rb', line 155 def temp_tables @temp_tables ||= {} end |
.timestamp ⇒ Object
Get a timestamp value as a string
88 89 90 |
# File 'lib/etl/engine.rb', line 88 def Time.now.strftime("%Y%m%d%H%M%S") end |
.use_temp_tables? ⇒ Boolean
Return true if using temp tables
174 175 176 |
# File 'lib/etl/engine.rb', line 174 def use_temp_tables? use_temp_tables ? true : false end |
Instance Method Details
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:
-
:transforms
-
:after_reads
-
:before_writes
-
:writes
248 249 250 251 252 253 254 255 |
# File 'lib/etl/engine.rb', line 248 def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end |
#errors ⇒ Object
Array of errors encountered during execution of the ETL process
237 238 239 |
# File 'lib/etl/engine.rb', line 237 def errors @errors ||= [] end |
#process(file) ⇒ Object
Process a file, control object or batch object. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/etl/engine.rb', line 263 def process(file) case file when String process(File.new(file)) when File case file.path when /.ctl/ then process_control(file) when /.etl/ then process_control(file) when /.ebf/ then process_batch(file) else raise RuntimeError, "Unsupported file type - #{file.path}" end when ETL::Control::Control process_control(file) when ETL::Batch::Batch process_batch(file) else raise RuntimeError, "Process object must be a String, File, Control instance or Batch instance" end end |
#say(message) ⇒ Object
Say the specified message, with a newline
219 220 221 |
# File 'lib/etl/engine.rb', line 219 def say() say_without_newline( + "\n") end |
#say_on_own_line(message) ⇒ Object
Say the message on its own line
232 233 234 |
# File 'lib/etl/engine.rb', line 232 def say_on_own_line() say("\n" + ) end |
#say_without_newline(message) ⇒ Object
Say the specified message without a newline
224 225 226 227 228 229 |
# File 'lib/etl/engine.rb', line 224 def say_without_newline() if ETL::Engine.realtime_activity $stdout.print $stdout.flush end end |