Class: ETL::Engine

Inherits:
Object show all
Includes:
Util
Defined in:
lib/etl/engine.rb

Overview

The main ETL engine clas

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#approximate_distance_of_time_in_words, #distance_of_time_in_words

Class Attribute Details

.average_rows_per_secondObject

Accessor for the average rows per second processed



138
139
140
# File 'lib/etl/engine.rb', line 138

def average_rows_per_second
  @average_rows_per_second
end

.batchObject

Access the current ETL::Execution::Batch instance



119
120
121
# File 'lib/etl/engine.rb', line 119

def batch
  @batch
end

.current_destinationObject

The current destination



97
98
99
# File 'lib/etl/engine.rb', line 97

def current_destination
  @current_destination
end

.current_sourceObject

The current source



91
92
93
# File 'lib/etl/engine.rb', line 91

def current_source
  @current_source
end

.current_source_rowObject

The current source row



94
95
96
# File 'lib/etl/engine.rb', line 94

def current_source_row
  @current_source_row
end

.exit_codeObject

exit code to be passed to the command line



88
89
90
# File 'lib/etl/engine.rb', line 88

def exit_code
  @exit_code
end

.jobObject

Access the current ETL::Execution::Job instance



116
117
118
# File 'lib/etl/engine.rb', line 116

def job
  @job
end

.limitObject

The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit



124
125
126
# File 'lib/etl/engine.rb', line 124

def limit
  @limit
end

.log_write_modeObject

Accessor for the log write mode. Default is ‘a’ for append.



61
62
63
# File 'lib/etl/engine.rb', line 61

def log_write_mode
  @log_write_mode
end

.loggerObject

:nodoc:



67
68
69
# File 'lib/etl/engine.rb', line 67

def logger
  @logger
end

.offsetObject

The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset



129
130
131
# File 'lib/etl/engine.rb', line 129

def offset
  @offset
end

.read_locallyObject

Set to true to read locally from the last source cache files



135
136
137
# File 'lib/etl/engine.rb', line 135

def read_locally
  @read_locally
end

.realtime_activityObject

Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT



101
102
103
# File 'lib/etl/engine.rb', line 101

def realtime_activity
  @realtime_activity
end

.rows_readObject

Accessor for the total number of rows read from sources



104
105
106
# File 'lib/etl/engine.rb', line 104

def rows_read
  @rows_read
end

.rows_writtenObject

Accessor for the total number of rows processed



110
111
112
# File 'lib/etl/engine.rb', line 110

def rows_written
  @rows_written
end

.skip_bulk_importObject

Set to true to skip all bulk importing



132
133
134
# File 'lib/etl/engine.rb', line 132

def skip_bulk_import
  @skip_bulk_import
end

.timestamped_logObject

Returns the value of attribute timestamped_log.



58
59
60
# File 'lib/etl/engine.rb', line 58

def timestamped_log
  @timestamped_log
end

.use_temp_tablesObject

Set to true to use temp tables



150
151
152
# File 'lib/etl/engine.rb', line 150

def use_temp_tables
  @use_temp_tables
end

Class Method Details

.connection(name) ⇒ Object

Get a named connection



141
142
143
144
145
146
147
# File 'lib/etl/engine.rb', line 141

def connection(name)
  logger.debug "Retrieving connection #{name}"
  conn = connections[name] ||= establish_connection(name)
  #conn.verify!(ActiveRecord::Base.verification_timeout)
  conn.reconnect! unless conn.active?
  conn
end

.finishObject

Called when a batch job finishes, allowing for cleanup to occur



158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/etl/engine.rb', line 158

def finish
  temp_tables.each do |temp_table, mapping|
    actual_table = mapping[:table]
    #puts "move #{temp_table} to #{actual_table}"
    conn = mapping[:connection]
    conn.transaction do
      conn.rename_table(actual_table, "#{actual_table}_old")
      conn.rename_table(temp_table, actual_table)
      conn.drop_table("#{actual_table}_old")
    end
  end
end

.init(options = {}) ⇒ Object

Initialization that is run when a job is executed.

Options:

  • :limit: Limit the number of records returned from sources

  • :offset: Specify the records for data from sources

  • :log_write_mode: If true then the log will write, otherwise it will append

  • :skip_bulk_import: Set to true to skip bulk import

  • :read_locally: Set to true to read from the local cache

  • :rails_root: Set to the rails root to boot rails



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/etl/engine.rb', line 20

def init(options={})
  unless @initialized
    puts "initializing ETL engine\n\n"
    @limit = options[:limit]
    @offset = options[:offset]
    @log_write_mode = 'w' if options[:newlog]
    @skip_bulk_import = options[:skip_bulk_import]
    @read_locally = options[:read_locally]
    @rails_root = options[:rails_root]
    
    require File.join(@rails_root, 'config/environment') if @rails_root
    options[:config] ||= 'database.yml'
    options[:config] = 'config/database.yml' unless File.exist?(options[:config])
    database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")
    ActiveRecord::Base.configurations.merge!(database_configuration)
    ETL::Base.configurations = HashWithIndifferentAccess.new(database_configuration)
    #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}"
    
    require 'etl/execution'
    ETL::Execution::Base.establish_connection :etl_execution
    ETL::Execution::Execution.migrate

    @initialized = true
  end
end

.process(file) ⇒ Object

Process the specified file. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance

The process command will accept either a .ctl or .ctl.rb for a Control file or a .ebf

or .ebf.rb for an ETL Batch File.


54
55
56
# File 'lib/etl/engine.rb', line 54

def process(file)
  new().process(file)
end

.table(table_name, connection) ⇒ Object

Modify the table name if necessary



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/etl/engine.rb', line 177

def table(table_name, connection)
  if use_temp_tables?
    temp_table_name = "tmp_#{table_name}"

    if temp_tables[temp_table_name].nil?
      # Create the temp table and add it to the mapping
      begin connection.drop_table(temp_table_name); rescue; end
      connection.copy_table(table_name, temp_table_name)
      temp_tables[temp_table_name] = {
        :table => table_name,
        :connection => connection
      }
    end

    temp_table_name
  else
    table_name
  end
end

.temp_tablesObject

Get a registry of temp tables



153
154
155
# File 'lib/etl/engine.rb', line 153

def temp_tables
  @temp_tables ||= {}
end

.timestampObject

Get a timestamp value as a string



83
84
85
# File 'lib/etl/engine.rb', line 83

def timestamp
  Time.now.strftime("%Y%m%d%H%M%S")
end

.use_temp_tables?Boolean

Return true if using temp tables

Returns:

  • (Boolean)


172
173
174
# File 'lib/etl/engine.rb', line 172

def use_temp_tables?
  use_temp_tables ? true : false
end

Instance Method Details

#benchmarksObject

Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:

  • :transforms

  • :after_reads

  • :before_writes

  • :writes



254
255
256
257
258
259
260
261
# File 'lib/etl/engine.rb', line 254

def benchmarks
  @benchmarks ||= {
    :transforms => 0,
    :after_reads => 0,
    :before_writes => 0,
    :writes => 0,
  }
end

#errorsObject

Array of errors encountered during execution of the ETL process



235
236
237
# File 'lib/etl/engine.rb', line 235

def errors
  @errors ||= []
end

#process(file) ⇒ Object

Process a file, control object or batch object. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/etl/engine.rb', line 269

def process(file)
  case file
    when String
      process(File.new(file))
    when File
      case file.path
        when /\.ctl(\.rb)?$/; process_control(file)
        when /\.ebf(\.rb)?$/; process_batch(file)
        else raise RuntimeError, "Unsupported file type - #{file.path}"
      end
    when ETL::Control::Control
      process_control(file)
    when ETL::Batch::Batch
      process_batch(file)
  else
    raise RuntimeError, "Process object must be a String, File, Control 
    instance or Batch instance"
  end
end

#say(message) ⇒ Object

Say the specified message, with a newline



217
218
219
# File 'lib/etl/engine.rb', line 217

def say(message)
  say_without_newline(message + "\n")
end

#say_on_own_line(message) ⇒ Object

Say the message on its own line



230
231
232
# File 'lib/etl/engine.rb', line 230

def say_on_own_line(message)
  say("\n" + message)
end

#say_without_newline(message) ⇒ Object

Say the specified message without a newline



222
223
224
225
226
227
# File 'lib/etl/engine.rb', line 222

def say_without_newline(message)
  if ETL::Engine.realtime_activity
    $stdout.print message
    $stdout.flush
  end
end

#track_error(control, msg) ⇒ Object

First attempt at centralizing error notifications



240
241
242
243
244
245
# File 'lib/etl/engine.rb', line 240

def track_error(control, msg)
  errors << msg
  control.error_handlers.each do |handler|
    handler.call(msg)
  end
end