Class: ETL::Engine

Inherits:
Object show all
Includes:
Util
Defined in:
lib/etl/engine.rb

Overview

The main ETL engine clas

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#approximate_distance_of_time_in_words, #distance_of_time_in_words

Class Attribute Details

.average_rows_per_secondObject

Accessor for the average rows per second processed



140
141
142
# File 'lib/etl/engine.rb', line 140

def average_rows_per_second
  @average_rows_per_second
end

.batchObject

Access the current ETL::Execution::Batch instance



121
122
123
# File 'lib/etl/engine.rb', line 121

def batch
  @batch
end

.current_destinationObject

The current destination



99
100
101
# File 'lib/etl/engine.rb', line 99

def current_destination
  @current_destination
end

.current_sourceObject

The current source



93
94
95
# File 'lib/etl/engine.rb', line 93

def current_source
  @current_source
end

.current_source_rowObject

The current source row



96
97
98
# File 'lib/etl/engine.rb', line 96

def current_source_row
  @current_source_row
end

.jobObject

Access the current ETL::Execution::Job instance



118
119
120
# File 'lib/etl/engine.rb', line 118

def job
  @job
end

.limitObject

The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit



126
127
128
# File 'lib/etl/engine.rb', line 126

def limit
  @limit
end

.log_write_modeObject

Accessor for the log write mode. Default is ‘a’ for append.



62
63
64
# File 'lib/etl/engine.rb', line 62

def log_write_mode
  @log_write_mode
end

.loggerObject

:nodoc:



68
69
70
# File 'lib/etl/engine.rb', line 68

def logger
  @logger
end

.offsetObject

The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset



131
132
133
# File 'lib/etl/engine.rb', line 131

def offset
  @offset
end

.read_locallyObject

Set to true to read locally from the last source cache files



137
138
139
# File 'lib/etl/engine.rb', line 137

def read_locally
  @read_locally
end

.realtime_activityObject

Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT



103
104
105
# File 'lib/etl/engine.rb', line 103

def realtime_activity
  @realtime_activity
end

.rows_readObject

Accessor for the total number of rows read from sources



106
107
108
# File 'lib/etl/engine.rb', line 106

def rows_read
  @rows_read
end

.rows_writtenObject

Accessor for the total number of rows processed



112
113
114
# File 'lib/etl/engine.rb', line 112

def rows_written
  @rows_written
end

.skip_bulk_importObject

Set to true to skip all bulk importing



134
135
136
# File 'lib/etl/engine.rb', line 134

def skip_bulk_import
  @skip_bulk_import
end

.timestamped_logObject

Returns the value of attribute timestamped_log.



59
60
61
# File 'lib/etl/engine.rb', line 59

def timestamped_log
  @timestamped_log
end

.use_temp_tablesObject

Set to true to use temp tables



152
153
154
# File 'lib/etl/engine.rb', line 152

def use_temp_tables
  @use_temp_tables
end

Class Method Details

.connection(name) ⇒ Object

Get a named connection



143
144
145
146
147
148
149
# File 'lib/etl/engine.rb', line 143

def connection(name)
  logger.debug "Retrieving connection #{name}"
  conn = connections[name] ||= establish_connection(name)
  #conn.verify!(ActiveRecord::Base.verification_timeout)
  conn.reconnect! unless conn.active?
  conn
end

.finishObject

Called when a batch job finishes, allowing for cleanup to occur



160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/etl/engine.rb', line 160

def finish
  temp_tables.each do |temp_table, mapping|
    actual_table = mapping[:table]
    #puts "move #{temp_table} to #{actual_table}"
    conn = mapping[:connection]
    conn.transaction do
      conn.rename_table(actual_table, "#{actual_table}_old")
      conn.rename_table(temp_table, actual_table)
      conn.drop_table("#{actual_table}_old")
    end
  end
end

.init(options = {}) ⇒ Object

Initialization that is run when a job is executed.

Options:

  • :limit: Limit the number of records returned from sources

  • :offset: Specify the records for data from sources

  • :log_write_mode: If true then the log will write, otherwise it will append

  • :skip_bulk_import: Set to true to skip bulk import

  • :read_locally: Set to true to read from the local cache

  • :rails_root: Set to the rails root to boot rails



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/etl/engine.rb', line 20

def init(options={})
  unless @initialized
    puts "initializing ETL engine\n\n"
    @limit = options[:limit]
    @offset = options[:offset]
    @log_write_mode = 'w' if options[:newlog]
    @skip_bulk_import = options[:skip_bulk_import]
    @read_locally = options[:read_locally]
    @rails_root = options[:rails_root]
    @log_dir = options[:log_dir]
    
    require File.join(@rails_root, 'config/environment') if @rails_root
    options[:config] ||= 'database.yml'
    options[:config] = 'config/database.yml' unless File.exist?(options[:config])
    database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")
    ActiveRecord::Base.configurations.merge!(database_configuration)
    ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration)
    #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}"
    
    require 'etl/execution'
    ETL::Execution::Base.establish_connection(options[:execution_conf] || :etl_execution)
    ETL::Execution::Execution.migrate

    @initialized = true
  end
end

.process(file) ⇒ Object

Process the specified file. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance

The process command will accept either a .ctl Control file or a .ebf ETL Batch File.



55
56
57
# File 'lib/etl/engine.rb', line 55

def process(file)
  new().process(file)
end

.table(table_name, connection) ⇒ Object

Modify the table name if necessary



179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/etl/engine.rb', line 179

def table(table_name, connection)
  if use_temp_tables?
    temp_table_name = "tmp_#{table_name}"

    if temp_tables[temp_table_name].nil?
      # Create the temp table and add it to the mapping
      begin connection.drop_table(temp_table_name); rescue; end
      connection.copy_table(table_name, temp_table_name)
      temp_tables[temp_table_name] = {
        :table => table_name,
        :connection => connection
      }
    end

    temp_table_name
  else
    table_name
  end
end

.temp_tablesObject

Get a registry of temp tables



155
156
157
# File 'lib/etl/engine.rb', line 155

def temp_tables
  @temp_tables ||= {}
end

.timestampObject

Get a timestamp value as a string



88
89
90
# File 'lib/etl/engine.rb', line 88

def timestamp
  Time.now.strftime("%Y%m%d%H%M%S")
end

.use_temp_tables?Boolean

Return true if using temp tables

Returns:

  • (Boolean)


174
175
176
# File 'lib/etl/engine.rb', line 174

def use_temp_tables?
  use_temp_tables ? true : false
end

Instance Method Details

#benchmarksObject

Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:

  • :transforms

  • :after_reads

  • :before_writes

  • :writes



248
249
250
251
252
253
254
255
# File 'lib/etl/engine.rb', line 248

def benchmarks
  @benchmarks ||= {
    :transforms => 0,
    :after_reads => 0,
    :before_writes => 0,
    :writes => 0,
  }
end

#errorsObject

Array of errors encountered during execution of the ETL process



237
238
239
# File 'lib/etl/engine.rb', line 237

def errors
  @errors ||= []
end

#process(file) ⇒ Object

Process a file, control object or batch object. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/etl/engine.rb', line 263

def process(file)
  case file
    when String
      process(File.new(file))
    when File
      case file.path
      when /.ctl/ then process_control(file)
      when /.etl/ then process_control(file)
      when /.ebf/ then process_batch(file)
      else
        raise RuntimeError, "Unsupported file type - #{file.path}"
      end
    when ETL::Control::Control
      process_control(file)
    when ETL::Batch::Batch
      process_batch(file)
  else
    raise RuntimeError, "Process object must be a String, File, Control 
    instance or Batch instance"
  end
end

#say(message) ⇒ Object

Say the specified message, with a newline



219
220
221
# File 'lib/etl/engine.rb', line 219

def say(message)
  say_without_newline(message + "\n")
end

#say_on_own_line(message) ⇒ Object

Say the message on its own line



232
233
234
# File 'lib/etl/engine.rb', line 232

def say_on_own_line(message)
  say("\n" + message)
end

#say_without_newline(message) ⇒ Object

Say the specified message without a newline



224
225
226
227
228
229
# File 'lib/etl/engine.rb', line 224

def say_without_newline(message)
  if ETL::Engine.realtime_activity
    $stdout.print message
    $stdout.flush
  end
end