Class: Fluent::CratedbOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CratedbOutput
- Includes:
- HandleTagNameMixin, SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_cratedb.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 25 def configure(conf) super if @column_names.nil? fail Fluent::ConfigError, 'column_names MUST specified, but missing' end @column_names = @column_names.split(',').collect(&:strip) @key_names = @key_names.nil? ? @column_names : @key_names.split(',').collect(&:strip) @servers = @hosts ? @hosts : ["#{@host}:#{@port}"] end |
#format(tag, time, record) ⇒ Object
53 54 55 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 53 def format(tag, time, record) [tag, time, format_proc.call(tag, time, record)].to_msgpack end |
#shutdown ⇒ Object
49 50 51 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 49 def shutdown super end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 38 def start super opts = {:http_options => @http_options, :logger => log} begin @client = CrateClient.new(@servers, opts) log.info "CrateDB connection confirmed: #{@servers.join(", ")}" rescue => e raise e end end |
#write(chunk) ⇒ Object
57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_cratedb.rb', line 57 def write(chunk) values = [] chunk.msgpack_each do |tag, time, data| #data = format_proc.call(tag, time, data) values << data end sql = "INSERT INTO #{@table} (#{@column_names.join(',')}) VALUES (#{ @column_names.map { |key| '?' }.join(',') })" @client.execute(sql, nil, values, @http_options) end |