Class: Fluent::CassandraOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CassandraOutput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_cassandra.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ CassandraOutput
constructor
A new instance of CassandraOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ CassandraOutput
Returns a new instance of CassandraOutput.
13 14 15 16 17 |
# File 'lib/fluent/plugin/out_cassandra.rb', line 13 def initialize super require 'cassandra' require 'msgpack' end |
Instance Method Details
#configure(conf) ⇒ Object
19 20 21 22 23 24 25 26 27 |
# File 'lib/fluent/plugin/out_cassandra.rb', line 19 def configure(conf) super raise ConfigError, "'Keyspace' parameter is required on cassandra output" unless @keyspace = conf['keyspace'] raise ConfigError, "'ColumnFamily' parameter is required on cassandra output" unless @columnfamily = conf['columnfamily'] @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'] : 9160 end |
#format(tag, time, record) ⇒ Object
38 39 40 |
# File 'lib/fluent/plugin/out_cassandra.rb', line 38 def format(tag, time,record) record.to_msgpack end |
#shutdown ⇒ Object
34 35 36 |
# File 'lib/fluent/plugin/out_cassandra.rb', line 34 def shutdown super end |
#start ⇒ Object
29 30 31 32 |
# File 'lib/fluent/plugin/out_cassandra.rb', line 29 def start super @connection = Cassandra.new(@keyspace, @host + ':' + @port ) end |
#write(chunk) ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_cassandra.rb', line 42 def write(chunk) chunk.msgpack_each { |record| @connection.insert( @columnfamily, record["tag"] + "_" + record["time"].to_s, record ) } end |