Class: Fluent::VerticaJsonOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::VerticaJsonOutput
- Defined in:
- lib/fluent/plugin/out_verticajson.rb
Instance Method Summary collapse
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ VerticaJsonOutput
constructor
A new instance of VerticaJsonOutput.
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ VerticaJsonOutput
Returns a new instance of VerticaJsonOutput.
14 15 16 17 18 19 |
# File 'lib/fluent/plugin/out_verticajson.rb', line 14 def initialize super require 'vertica' require 'json' end |
Instance Method Details
#format(tag, time, record) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/fluent/plugin/out_verticajson.rb', line 21 def format(tag, time, record) record_altered = Hash[ record.map{ |k, v| if v.is_a?(Hash) or v.is_a?(Array) [k, "#{v.to_json}"] else [k, v] end } ] $log.info "New data received to the buffer for the table #{@schema}.#{@table}" record_altered.to_json end |
#write(chunk) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_verticajson.rb', line 36 def write(chunk) perm_table = "\"#{@schema}\".\"#{@table}\"" chunk.open do |file| file_contents = file.read vertica.copy(<<-SQL) { |handle| handle.write(file_contents) } COPY #{perm_table} FROM STDIN PARSER fjsonparser(flatten_maps=false) ENFORCELENGTH DIRECT REJECTED DATA AS TABLE #{@table}_rejected SQL vertica.close @vertica = nil end $log.info "Data successfully loaded to vertica table #{perm_table}." end |