Class: ETL::Control::FileDestination

Inherits:
Destination show all
Defined in:
lib/etl/control/destination/file_destination.rb

Overview

File as the final destination.

Instance Attribute Summary collapse

Attributes inherited from Destination

#append_rows, #buffer_size, #condition, #configuration, #control, #mapping, #unique

Instance Method Summary collapse

Methods inherited from Destination

class_for_name, #current_row, #errors, #write

Constructor Details

#initialize(control, configuration, mapping = {}) ⇒ FileDestination

Initialize the object.

  • control: The Control object

  • configuration: The configuration map

  • mapping: The output mapping

Configuration options:

  • <tt>:file<tt>: The file to write to (REQUIRED)

  • :append: Set to true to append to the file (default is to overwrite)

  • :separator: Record separator (default is a comma)

  • :eol: End of line marker (default is n)

  • :enclose: Enclosure character (default is none)

  • :unique: Set to true to only write unique records

  • :append_rows: Array of rows to append

Mapping options:

  • :order: The order array

Raises:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/etl/control/destination/file_destination.rb', line 41

def initialize(control, configuration, mapping={})
  super
  path = Pathname.new(configuration[:file])
  @file = path.absolute? ? path : Pathname.new(File.dirname(File.expand_path(control.file))) + path
  @append = configuration[:append] ||= false
  @separator = configuration[:separator] ||= ','
  @eol = configuration[:eol] ||= "\n"
  @enclose = configuration[:enclose]
  @unique = configuration[:unique] ? configuration[:unique] + scd_required_fields : configuration[:unique]
  @unique.uniq! unless @unique.nil?
  @order = mapping[:order] ? mapping[:order] + scd_required_fields : order_from_source
  @order.uniq! unless @order.nil?
  raise ControlError, "Order required in mapping" unless @order
end

Instance Attribute Details

#appendObject

Flag which indicates to append (default is to overwrite)



14
15
16
# File 'lib/etl/control/destination/file_destination.rb', line 14

def append
  @append
end

#encloseObject

The enclosure character



23
24
25
# File 'lib/etl/control/destination/file_destination.rb', line 23

def enclose
  @enclose
end

#eolObject

The end of line marker



20
21
22
# File 'lib/etl/control/destination/file_destination.rb', line 20

def eol
  @eol
end

#fileObject (readonly)

The File to write to



8
9
10
# File 'lib/etl/control/destination/file_destination.rb', line 8

def file
  @file
end

#orderObject (readonly)

The output order



11
12
13
# File 'lib/etl/control/destination/file_destination.rb', line 11

def order
  @order
end

#separatorObject

The separator



17
18
19
# File 'lib/etl/control/destination/file_destination.rb', line 17

def separator
  @separator
end

Instance Method Details

#closeObject

Close the destination. This will flush the buffer and close the underlying stream or connection.



57
58
59
60
61
# File 'lib/etl/control/destination/file_destination.rb', line 57

def close
  buffer << append_rows if append_rows
  flush
  f.close
end

#flushObject

Flush the destination buffer



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/etl/control/destination/file_destination.rb', line 64

def flush
  #puts "Flushing buffer (#{file}) with #{buffer.length} rows"
  buffer.flatten.each do |row|
    #puts "row change type: #{row.change_type}"
    # check to see if this row's compound key constraint already exists
    # note that the compound key constraint may not utilize virtual fields
    next unless row_allowed?(row)
    
    # add any virtual fields
    add_virtuals!(row)
    
    # collect all of the values using the order designated in the configuration
    values = order.collect do |name|
      value = row[name]
      case value
      when Date, Time, DateTime
        value.to_s(:db)
      else
        value.to_s
      end
    end
    
    values.collect! { |v| v.gsub(/\\/, '\\\\\\\\')}
    values.collect! { |v| v.gsub(separator, "\\#{separator}")}
    values.collect! { |v| v.gsub(/\n|\r/, '')}
    
    # enclose the value if required
    if !enclose.nil?
      values.collect! { |v| enclose + v.gsub(/(#{enclose})/, '\\\\\1') + enclose }
    end
    
    # write the values joined by the separator defined in the configuration
    f.write(values.join(separator))
    
    # write the end-of-line
    f.write(eol)
  end
  f.flush
  buffer.clear
  #puts "After flush there are #{buffer.length} rows"
end