Class: Fluent::VerticaJsonOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_verticajson.rb

Instance Method Summary collapse

Constructor Details

#initializeVerticaJsonOutput

Returns a new instance of VerticaJsonOutput.



14
15
16
17
18
19
# File 'lib/fluent/plugin/out_verticajson.rb', line 14

def initialize
  super

  require 'vertica'
  require 'json'
end

Instance Method Details

#format(tag, time, record) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_verticajson.rb', line 21

def format(tag, time, record)
  record_altered = Hash[
      record.map{ |k, v|
        if v.is_a?(Hash) or v.is_a?(Array)
          [k, "#{v.to_json}"]
        else
          [k, v]
        end
      }
  ]

  $log.info "New data received to the buffer for the table #{@schema}.#{@table}"
  record_altered.to_json
end

#write(chunk) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/out_verticajson.rb', line 36

def write(chunk)
  perm_table = "\"#{@schema}\".\"#{@table}\""

  chunk.open do |file|

    file_contents = file.read

    vertica.copy(<<-SQL) { |handle| handle.write(file_contents) }
      COPY #{perm_table}
      FROM STDIN
      PARSER fjsonparser(flatten_maps=false)
      ENFORCELENGTH DIRECT
      REJECTED DATA AS TABLE #{@table}_rejected
    SQL

    vertica.close
    @vertica = nil
  end
  $log.info "Data successfully loaded to vertica table #{perm_table}."
end