Class: Fluent::CratedbOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
HandleTagNameMixin, SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_cratedb.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/fluent/plugin/out_cratedb.rb', line 25

def configure(conf)
  super

  if @column_names.nil?
    fail Fluent::ConfigError, 'column_names MUST specified, but missing'
  end

  @column_names = @column_names.split(',').collect(&:strip)
  @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip)
 
  @servers = @hosts ? @hosts : ["#{@host}:#{@port}"]
end

#format(tag, time, record) ⇒ Object



53
54
55
# File 'lib/fluent/plugin/out_cratedb.rb', line 53

def format(tag, time, record)
  [tag, time, format_proc.call(tag, time, record)].to_msgpack
end

#shutdownObject



49
50
51
# File 'lib/fluent/plugin/out_cratedb.rb', line 49

def shutdown
  super
end

#startObject



38
39
40
41
42
43
44
45
46
47
# File 'lib/fluent/plugin/out_cratedb.rb', line 38

def start
  super
  opts = {:http_options => @http_options, :logger => log}
  begin
    @client = CrateClient.new(@servers, opts)
    log.info "CrateDB connection confirmed: #{@servers.join(", ")}"
  rescue => e
    raise e
  end
end

#write(chunk) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_cratedb.rb', line 57

def write(chunk)
  values = []
  chunk.msgpack_each do |tag, time, data|
    #data = format_proc.call(tag, time, data)
    values << data
  end
  sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES (#{ @column_names.map { |key| '?' }.join(',') })"
  @client.execute(sql, nil, values, @http_options)
end