Class: ETL::Processor::DatabaseJoinProcessor

Inherits:
RowProcessor show all
Defined in:
lib/etl/processor/database_join_processor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from RowProcessor

#ensure_columns_available_in_row!

Constructor Details

#initialize(control, configuration) ⇒ DatabaseJoinProcessor

Initialize the procesor.

Arguments:

  • control: The ETL::Control::Control instance

  • configuration: The configuration Hash

  • definition: The source definition

Required configuration options:

  • :target: The target connection

  • :query: The join query

  • :fields: The fields to add to the row

Raises:



19
20
21
22
23
24
25
26
27
# File 'lib/etl/processor/database_join_processor.rb', line 19

def initialize(control, configuration)
  super
  @target = configuration[:target]
  @query = configuration[:query]
  @fields = configuration[:fields]
  raise ControlError, ":target must be specified" unless @target
  raise ControlError, ":query must be specified" unless @query
  raise ControlError, ":fields must be specified" unless @fields
end

Instance Attribute Details

#fieldsObject (readonly)

Returns the value of attribute fields.



6
7
8
# File 'lib/etl/processor/database_join_processor.rb', line 6

def fields
  @fields
end

#queryObject (readonly)

Returns the value of attribute query.



5
6
7
# File 'lib/etl/processor/database_join_processor.rb', line 5

def query
  @query
end

#targetObject (readonly)

Returns the value of attribute target.



4
5
6
# File 'lib/etl/processor/database_join_processor.rb', line 4

def target
  @target
end

Instance Method Details

#process(row) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/etl/processor/database_join_processor.rb', line 34

def process(row)
  return nil if row.nil?

  q = @query
  begin
    q = eval('"' + @query + '"')
  rescue
  end

  ETL::Engine.logger.debug("Executing select: #{q}")
  res = connection.execute(q)

  # TODO - refactor this and move it (and similar code around) to adapter_extensions
  case connection.class.name
    when "ActiveRecord::ConnectionAdapters::PostgreSQLAdapter";
      res.each do |r|
        @fields.each do |field|
          row[field.to_sym] = r[field.to_s]
        end
      end
    when "ActiveRecord::ConnectionAdapters::Mysql2Adapter";
      res.each(:as => :hash) do |r|
        @fields.each do |field|
          row[field.to_sym] = r[field.to_s]
        end
      end
    when "ActiveRecord::ConnectionAdapters::MysqlAdapter";
      res.each_hash do |r|
        @fields.each do |field|
          row[field.to_sym] = r[field.to_s]
        end
      end
      res.free
    else raise "Unsupported adapter #{connection.class} for this destination"
  end

  return row
end

#to_sObject

Get a String identifier for the source



30
31
32
# File 'lib/etl/processor/database_join_processor.rb', line 30

def to_s
  "#{host}/#{database}"
end