Class: Fluent::PostgresOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_postgres.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePostgresOutput

Returns a new instance of PostgresOutput.



22
23
24
25
# File 'lib/fluent/plugin/out_postgres.rb', line 22

def initialize
  super
  require 'pg'
end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



20
21
22
# File 'lib/fluent/plugin/out_postgres.rb', line 20

def handler
  @handler
end

Instance Method Details

#clientObject



58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_postgres.rb', line 58

def client
  PG::Connection.new({
    :host => @host, :port => @port,
    :user => @username, :password => @password,
    :dbname => @database
  })
end

#configure(conf) ⇒ Object

We don’t currently support mysql’s analogous json format



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_postgres.rb', line 28

def configure(conf)
  super

  if @format == 'json'
    @format_proc = Proc.new{|tag, time, record| record.to_json}
  else
    @key_names = @key_names.split(',')
    @format_proc = Proc.new{|tag, time, record| @key_names.map{|k| record[k]}}
  end

  if @columns.nil? and @sql.nil?
    raise Fluent::ConfigError, "columns or sql MUST be specified, but missing"
  end
  if @columns and @sql
    raise Fluent::ConfigError, "both of columns and sql are specified, but specify one of them"
  end
end

#format(tag, time, record) ⇒ Object



54
55
56
# File 'lib/fluent/plugin/out_postgres.rb', line 54

def format(tag, time, record)
  [tag, time, @format_proc.call(tag, time, record)].to_msgpack
end

#shutdownObject



50
51
52
# File 'lib/fluent/plugin/out_postgres.rb', line 50

def shutdown
  super
end

#startObject



46
47
48
# File 'lib/fluent/plugin/out_postgres.rb', line 46

def start
  super
end

#write(chunk) ⇒ Object



66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/out_postgres.rb', line 66

def write(chunk)
  handler = self.client
  handler.prepare("write", @sql)
  chunk.msgpack_each { |tag, time, data|
    handler.exec_prepared("write", data)
  }
  handler.close
end