Class: ETL::Processor::CheckExistProcessor

Inherits:
RowProcessor show all
Defined in:
lib/etl/processor/check_exist_processor.rb

Overview

A row-level processor that checks if the row already exists in the target table

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from RowProcessor

#ensure_columns_available_in_row!

Constructor Details

#initialize(control, configuration) ⇒ CheckExistProcessor

Initialize the processor Configuration options:

  • :columns: An array of symbols for columns that should be included in the query conditions. If this option is not specified then all of the columns in the row will be included in the conditions (unless :skip is specified).

  • :skip: A symbol or array of symbols that should not be included in the existence check. If this option is not specified then all of the columns will be included in the existence check (unless :columns is specified).

  • :target: The target connection

  • :table: The table name



28
29
30
31
32
33
34
35
36
37
# File 'lib/etl/processor/check_exist_processor.rb', line 28

def initialize(control, configuration)
  super
  @skip = configuration[:skip] || []
  @target = configuration[:target] || raise(ETL::ControlError, "target must be specified")
  @table = configuration[:table] || raise(ETL::ControlError, "table must be specified")
  @columns = configuration[:columns]
  
  q = "SELECT COUNT(*) FROM #{table_name}"
  @should_check = ETL::Engine.connection(target).select_value(q).to_i > 0 
end

Instance Attribute Details

#columnsObject

An array of columns representing the natural key



16
17
18
# File 'lib/etl/processor/check_exist_processor.rb', line 16

def columns
  @columns
end

#should_checkObject

Is set to true if the processor should execute the check. If there are no rows in the target table then this should return false.



20
21
22
# File 'lib/etl/processor/check_exist_processor.rb', line 20

def should_check
  @should_check
end

#skipObject

A symbol or array of symbols representing keys that should be skipped



7
8
9
# File 'lib/etl/processor/check_exist_processor.rb', line 7

def skip
  @skip
end

#tableObject

The name of the table to check against



13
14
15
# File 'lib/etl/processor/check_exist_processor.rb', line 13

def table
  @table
end

#targetObject

The target database



10
11
12
# File 'lib/etl/processor/check_exist_processor.rb', line 10

def target
  @target
end

Instance Method Details

#process(row) ⇒ Object

Process the row



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/etl/processor/check_exist_processor.rb', line 55

def process(row)
  return row unless should_check?
  conn = ETL::Engine.connection(target)
  q = "SELECT * FROM #{table_name} WHERE "
  conditions = []
  ensure_columns_available_in_row!(row, columns, 'for existence check')
  row.each do |k,v| 
    if columns.nil? || columns.include?(k.to_sym)
      conditions << "#{k} = #{conn.quote(v)}" unless skip?(k.to_sym)
    end
  end
  q << conditions.join(" AND ")
  q << " LIMIT 1"

  result = conn.select_one(q)
  return row if result.nil?
end

#should_check?Boolean

Return true if the row should be checked

Returns:

  • (Boolean)


50
51
52
# File 'lib/etl/processor/check_exist_processor.rb', line 50

def should_check?
  @should_check ? true : false
end

#skip?(key) ⇒ Boolean

Return true if the given key should be skipped

Returns:

  • (Boolean)


40
41
42
43
44
45
46
47
# File 'lib/etl/processor/check_exist_processor.rb', line 40

def skip?(key)
  case skip
  when Array
    skip.include?(key)
  else
    skip.to_sym == key.to_sym
  end
end