Class: ETL::Engine
Overview
The main ETL engine clas
Class Attribute Summary collapse
-
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed.
-
.batch ⇒ Object
Access the current ETL::Execution::Batch instance.
-
.current_destination ⇒ Object
The current destination.
-
.current_source ⇒ Object
The current source.
-
.current_source_row ⇒ Object
The current source row.
-
.job ⇒ Object
Access the current ETL::Execution::Job instance.
-
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch.
-
.log_write_mode ⇒ Object
Accessor for the log write mode.
-
.logger ⇒ Object
:nodoc:.
-
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch.
-
.read_locally ⇒ Object
Set to true to read locally from the last source cache files.
-
.realtime_activity ⇒ Object
Set to true to activate realtime activity.
-
.rows_read ⇒ Object
Accessor for the total number of rows read from sources.
-
.rows_written ⇒ Object
Accessor for the total number of rows processed.
-
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing.
-
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
-
.use_temp_tables ⇒ Object
Set to true to use temp tables.
Class Method Summary collapse
-
.connection(name) ⇒ Object
Get a named connection.
-
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur.
-
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
-
.process(file) ⇒ Object
Process the specified file.
-
.table(table_name, connection) ⇒ Object
Modify the table name if necessary.
-
.temp_tables ⇒ Object
Get a registry of temp tables.
-
.timestamp ⇒ Object
Get a timestamp value as a string.
-
.use_temp_tables? ⇒ Boolean
Return true if using temp tables.
Instance Method Summary collapse
-
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline.
-
#errors ⇒ Object
Array of errors encountered during execution of the ETL process.
-
#process(file) ⇒ Object
Process a file, control object or batch object.
-
#say(message) ⇒ Object
Say the specified message, with a newline.
-
#say_on_own_line(message) ⇒ Object
Say the message on its own line.
-
#say_without_newline(message) ⇒ Object
Say the specified message without a newline.
Methods included from Util
#approximate_distance_of_time_in_words, #distance_of_time_in_words
Class Attribute Details
.average_rows_per_second ⇒ Object
Accessor for the average rows per second processed
135 136 137 |
# File 'lib/etl/engine.rb', line 135 def average_rows_per_second @average_rows_per_second end |
.batch ⇒ Object
Access the current ETL::Execution::Batch instance
116 117 118 |
# File 'lib/etl/engine.rb', line 116 def batch @batch end |
.current_destination ⇒ Object
The current destination
94 95 96 |
# File 'lib/etl/engine.rb', line 94 def current_destination @current_destination end |
.current_source ⇒ Object
The current source
88 89 90 |
# File 'lib/etl/engine.rb', line 88 def current_source @current_source end |
.current_source_row ⇒ Object
The current source row
91 92 93 |
# File 'lib/etl/engine.rb', line 91 def current_source_row @current_source_row end |
.job ⇒ Object
Access the current ETL::Execution::Job instance
113 114 115 |
# File 'lib/etl/engine.rb', line 113 def job @job end |
.limit ⇒ Object
The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit
121 122 123 |
# File 'lib/etl/engine.rb', line 121 def limit @limit end |
.log_write_mode ⇒ Object
Accessor for the log write mode. Default is ‘a’ for append.
61 62 63 |
# File 'lib/etl/engine.rb', line 61 def log_write_mode @log_write_mode end |
.logger ⇒ Object
:nodoc:
67 68 69 |
# File 'lib/etl/engine.rb', line 67 def logger @logger end |
.offset ⇒ Object
The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset
126 127 128 |
# File 'lib/etl/engine.rb', line 126 def offset @offset end |
.read_locally ⇒ Object
Set to true to read locally from the last source cache files
132 133 134 |
# File 'lib/etl/engine.rb', line 132 def read_locally @read_locally end |
.realtime_activity ⇒ Object
Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT
98 99 100 |
# File 'lib/etl/engine.rb', line 98 def realtime_activity @realtime_activity end |
.rows_read ⇒ Object
Accessor for the total number of rows read from sources
101 102 103 |
# File 'lib/etl/engine.rb', line 101 def rows_read @rows_read end |
.rows_written ⇒ Object
Accessor for the total number of rows processed
107 108 109 |
# File 'lib/etl/engine.rb', line 107 def rows_written @rows_written end |
.skip_bulk_import ⇒ Object
Set to true to skip all bulk importing
129 130 131 |
# File 'lib/etl/engine.rb', line 129 def skip_bulk_import @skip_bulk_import end |
.timestamped_log ⇒ Object
Returns the value of attribute timestamped_log.
58 59 60 |
# File 'lib/etl/engine.rb', line 58 def @timestamped_log end |
.use_temp_tables ⇒ Object
Set to true to use temp tables
147 148 149 |
# File 'lib/etl/engine.rb', line 147 def use_temp_tables @use_temp_tables end |
Class Method Details
.connection(name) ⇒ Object
Get a named connection
138 139 140 141 142 143 144 |
# File 'lib/etl/engine.rb', line 138 def connection(name) logger.debug "Retrieving connection #{name}" conn = connections[name] ||= establish_connection(name) #conn.verify!(ActiveRecord::Base.verification_timeout) conn.reconnect! unless conn.active? conn end |
.finish ⇒ Object
Called when a batch job finishes, allowing for cleanup to occur
155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/etl/engine.rb', line 155 def finish temp_tables.each do |temp_table, mapping| actual_table = mapping[:table] #puts "move #{temp_table} to #{actual_table}" conn = mapping[:connection] conn.transaction do conn.rename_table(actual_table, "#{actual_table}_old") conn.rename_table(temp_table, actual_table) conn.drop_table("#{actual_table}_old") end end end |
.init(options = {}) ⇒ Object
Initialization that is run when a job is executed.
Options:
-
:limit
: Limit the number of records returned from sources -
:offset
: Specify the records for data from sources -
:log_write_mode
: If true then the log will write, otherwise it will append -
:skip_bulk_import
: Set to true to skip bulk import -
:read_locally
: Set to true to read from the local cache -
:rails_root
: Set to the rails root to boot rails
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/etl/engine.rb', line 20 def init(={}) unless @initialized puts "initializing ETL engine\n\n" @limit = [:limit] @offset = [:offset] @log_write_mode = 'w' if [:newlog] @skip_bulk_import = [:skip_bulk_import] @read_locally = [:read_locally] @rails_root = [:rails_root] require File.join(@rails_root, 'config/environment') if @rails_root [:config] ||= 'database.yml' [:config] = 'config/database.yml' unless File.exist?([:config]) database_configuration = YAML::load(ERB.new(IO.read([:config])).result + "\n") ActiveRecord::Base.configurations.merge!(database_configuration) ETL::Base.configurations = database_configuration #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}" require 'etl/execution' ETL::Execution::Base.establish_connection :etl_execution ETL::Execution::Execution.migrate @initialized = true end end |
.process(file) ⇒ Object
Process the specified file. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
The process command will accept either a .ctl Control file or a .ebf ETL Batch File.
54 55 56 |
# File 'lib/etl/engine.rb', line 54 def process(file) new().process(file) end |
.table(table_name, connection) ⇒ Object
Modify the table name if necessary
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/etl/engine.rb', line 174 def table(table_name, connection) if use_temp_tables? returning "tmp_#{table_name}" do |temp_table_name| if temp_tables[temp_table_name].nil? # Create the temp table and add it to the mapping begin connection.drop_table(temp_table_name); rescue; end connection.copy_table(table_name, temp_table_name) temp_tables[temp_table_name] = { :table => table_name, :connection => connection } end end else table_name end end |
.temp_tables ⇒ Object
Get a registry of temp tables
150 151 152 |
# File 'lib/etl/engine.rb', line 150 def temp_tables @temp_tables ||= {} end |
.timestamp ⇒ Object
Get a timestamp value as a string
83 84 85 |
# File 'lib/etl/engine.rb', line 83 def Time.now.strftime("%Y%m%d%H%M%S") end |
.use_temp_tables? ⇒ Boolean
Return true if using temp tables
169 170 171 |
# File 'lib/etl/engine.rb', line 169 def use_temp_tables? use_temp_tables ? true : false end |
Instance Method Details
#benchmarks ⇒ Object
Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:
-
:transforms
-
:after_reads
-
:before_writes
-
:writes
239 240 241 242 243 244 245 246 |
# File 'lib/etl/engine.rb', line 239 def benchmarks @benchmarks ||= { :transforms => 0, :after_reads => 0, :before_writes => 0, :writes => 0, } end |
#errors ⇒ Object
Array of errors encountered during execution of the ETL process
228 229 230 |
# File 'lib/etl/engine.rb', line 228 def errors @errors ||= [] end |
#process(file) ⇒ Object
Process a file, control object or batch object. Acceptable values for file are:
-
Path to a file
-
File object
-
ETL::Control::Control instance
-
ETL::Batch::Batch instance
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/etl/engine.rb', line 254 def process(file) case file when String process(File.new(file)) when File process_control(file) if file.path =~ /.ctl$/ process_batch(file) if file.path =~ /.ebf$/ when ETL::Control::Control process_control(file) when ETL::Batch::Batch process_batch(file) else raise RuntimeError, "Process object must be a String, File, Control instance or Batch instance" end end |
#say(message) ⇒ Object
Say the specified message, with a newline
210 211 212 |
# File 'lib/etl/engine.rb', line 210 def say() say_without_newline( + "\n") end |
#say_on_own_line(message) ⇒ Object
Say the message on its own line
223 224 225 |
# File 'lib/etl/engine.rb', line 223 def say_on_own_line() say("\n" + ) end |
#say_without_newline(message) ⇒ Object
Say the specified message without a newline
215 216 217 218 219 220 |
# File 'lib/etl/engine.rb', line 215 def say_without_newline() if ETL::Engine.realtime_activity $stdout.print $stdout.flush end end |