Class: ETL::Engine

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/etl/engine.rb

Overview

The main ETL engine clas

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#approximate_distance_of_time_in_words, #distance_of_time_in_words

Class Attribute Details

.average_rows_per_secondObject

Accessor for the average rows per second processed



135
136
137
# File 'lib/etl/engine.rb', line 135

def average_rows_per_second
  @average_rows_per_second
end

.batchObject

Access the current ETL::Execution::Batch instance



116
117
118
# File 'lib/etl/engine.rb', line 116

def batch
  @batch
end

.current_destinationObject

The current destination



94
95
96
# File 'lib/etl/engine.rb', line 94

def current_destination
  @current_destination
end

.current_sourceObject

The current source



88
89
90
# File 'lib/etl/engine.rb', line 88

def current_source
  @current_source
end

.current_source_rowObject

The current source row



91
92
93
# File 'lib/etl/engine.rb', line 91

def current_source_row
  @current_source_row
end

.jobObject

Access the current ETL::Execution::Job instance



113
114
115
# File 'lib/etl/engine.rb', line 113

def job
  @job
end

.limitObject

The limit on rows to load from the source, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no limit



121
122
123
# File 'lib/etl/engine.rb', line 121

def limit
  @limit
end

.log_write_modeObject

Accessor for the log write mode. Default is ‘a’ for append.



61
62
63
# File 'lib/etl/engine.rb', line 61

def log_write_mode
  @log_write_mode
end

.loggerObject

:nodoc:



67
68
69
# File 'lib/etl/engine.rb', line 67

def logger
  @logger
end

.offsetObject

The offset for the source to begin at, useful for testing the ETL process prior to executing the entire batch. Default value is nil and indicates that there is no offset



126
127
128
# File 'lib/etl/engine.rb', line 126

def offset
  @offset
end

.read_locallyObject

Set to true to read locally from the last source cache files



132
133
134
# File 'lib/etl/engine.rb', line 132

def read_locally
  @read_locally
end

.realtime_activityObject

Set to true to activate realtime activity. This will cause certain information messages to be printed to STDOUT



98
99
100
# File 'lib/etl/engine.rb', line 98

def realtime_activity
  @realtime_activity
end

.rows_readObject

Accessor for the total number of rows read from sources



101
102
103
# File 'lib/etl/engine.rb', line 101

def rows_read
  @rows_read
end

.rows_writtenObject

Accessor for the total number of rows processed



107
108
109
# File 'lib/etl/engine.rb', line 107

def rows_written
  @rows_written
end

.skip_bulk_importObject

Set to true to skip all bulk importing



129
130
131
# File 'lib/etl/engine.rb', line 129

def skip_bulk_import
  @skip_bulk_import
end

.timestamped_logObject

Returns the value of attribute timestamped_log.



58
59
60
# File 'lib/etl/engine.rb', line 58

def timestamped_log
  @timestamped_log
end

.use_temp_tablesObject

Set to true to use temp tables



147
148
149
# File 'lib/etl/engine.rb', line 147

def use_temp_tables
  @use_temp_tables
end

Class Method Details

.connection(name) ⇒ Object

Get a named connection



138
139
140
141
142
143
144
# File 'lib/etl/engine.rb', line 138

def connection(name)
  logger.debug "Retrieving connection #{name}"
  conn = connections[name] ||= establish_connection(name)
  #conn.verify!(ActiveRecord::Base.verification_timeout)
  conn.reconnect! unless conn.active?
  conn
end

.finishObject

Called when a batch job finishes, allowing for cleanup to occur



155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/etl/engine.rb', line 155

def finish
  temp_tables.each do |temp_table, mapping|
    actual_table = mapping[:table]
    #puts "move #{temp_table} to #{actual_table}"
    conn = mapping[:connection]
    conn.transaction do
      conn.rename_table(actual_table, "#{actual_table}_old")
      conn.rename_table(temp_table, actual_table)
      conn.drop_table("#{actual_table}_old")
    end
  end
end

.init(options = {}) ⇒ Object

Initialization that is run when a job is executed.

Options:

  • :limit: Limit the number of records returned from sources

  • :offset: Specify the records for data from sources

  • :log_write_mode: If true then the log will write, otherwise it will append

  • :skip_bulk_import: Set to true to skip bulk import

  • :read_locally: Set to true to read from the local cache

  • :rails_root: Set to the rails root to boot rails



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/etl/engine.rb', line 20

def init(options={})
  unless @initialized
    puts "initializing ETL engine\n\n"
    @limit = options[:limit]
    @offset = options[:offset]
    @log_write_mode = 'w' if options[:newlog]
    @skip_bulk_import = options[:skip_bulk_import]
    @read_locally = options[:read_locally]
    @rails_root = options[:rails_root]
    
    require File.join(@rails_root, 'config/environment') if @rails_root
    options[:config] ||= 'database.yml'
    options[:config] = 'config/database.yml' unless File.exist?(options[:config])
    database_configuration = YAML::load(ERB.new(IO.read(options[:config])).result + "\n")
    ActiveRecord::Base.configurations.merge!(database_configuration)
    ETL::Base.configurations = database_configuration
    #puts "configurations in init: #{ActiveRecord::Base.configurations.inspect}"
    
    require 'etl/execution'
    ETL::Execution::Base.establish_connection :etl_execution
    ETL::Execution::Execution.migrate

    @initialized = true
  end
end

.process(file) ⇒ Object

Process the specified file. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance

The process command will accept either a .ctl Control file or a .ebf ETL Batch File.



54
55
56
# File 'lib/etl/engine.rb', line 54

def process(file)
  new().process(file)
end

.table(table_name, connection) ⇒ Object

Modify the table name if necessary



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/etl/engine.rb', line 174

def table(table_name, connection)
  if use_temp_tables?
    returning "tmp_#{table_name}" do |temp_table_name|
      if temp_tables[temp_table_name].nil?
        # Create the temp table and add it to the mapping
        begin connection.drop_table(temp_table_name); rescue; end
        connection.copy_table(table_name, temp_table_name)
        temp_tables[temp_table_name] = {
          :table => table_name,
          :connection => connection
        }
      end
    end
  else
    table_name
  end
end

.temp_tablesObject

Get a registry of temp tables



150
151
152
# File 'lib/etl/engine.rb', line 150

def temp_tables
  @temp_tables ||= {}
end

.timestampObject

Get a timestamp value as a string



83
84
85
# File 'lib/etl/engine.rb', line 83

def timestamp
  Time.now.strftime("%Y%m%d%H%M%S")
end

.use_temp_tables?Boolean

Return true if using temp tables

Returns:

  • (Boolean)


169
170
171
# File 'lib/etl/engine.rb', line 169

def use_temp_tables?
  use_temp_tables ? true : false
end

Instance Method Details

#benchmarksObject

Get a Hash of benchmark values where each value represents the total amount of time in seconds spent processing in that portion of the ETL pipeline. Keys include:

  • :transforms

  • :after_reads

  • :before_writes

  • :writes



239
240
241
242
243
244
245
246
# File 'lib/etl/engine.rb', line 239

def benchmarks
  @benchmarks ||= {
    :transforms => 0,
    :after_reads => 0,
    :before_writes => 0,
    :writes => 0,
  }
end

#errorsObject

Array of errors encountered during execution of the ETL process



228
229
230
# File 'lib/etl/engine.rb', line 228

def errors
  @errors ||= []
end

#process(file) ⇒ Object

Process a file, control object or batch object. Acceptable values for file are:

  • Path to a file

  • File object

  • ETL::Control::Control instance

  • ETL::Batch::Batch instance



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/etl/engine.rb', line 254

def process(file)
  case file
  when String
    process(File.new(file))
  when File
    process_control(file) if file.path =~ /.ctl$/
    process_batch(file) if file.path =~ /.ebf$/
  when ETL::Control::Control
    process_control(file)
  when ETL::Batch::Batch
    process_batch(file)
  else
    raise RuntimeError, "Process object must be a String, File, Control 
    instance or Batch instance"
  end
end

#say(message) ⇒ Object

Say the specified message, with a newline



210
211
212
# File 'lib/etl/engine.rb', line 210

def say(message)
  say_without_newline(message + "\n")
end

#say_on_own_line(message) ⇒ Object

Say the message on its own line



223
224
225
# File 'lib/etl/engine.rb', line 223

def say_on_own_line(message)
  say("\n" + message)
end

#say_without_newline(message) ⇒ Object

Say the specified message without a newline



215
216
217
218
219
220
# File 'lib/etl/engine.rb', line 215

def say_without_newline(message)
  if ETL::Engine.realtime_activity
    $stdout.print message
    $stdout.flush
  end
end