Class: Fluent::CassandraOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_cassandra.rb

Instance Method Summary collapse

Constructor Details

#initializeCassandraOutput

Returns a new instance of CassandraOutput.



13
14
15
16
17
# File 'lib/fluent/plugin/out_cassandra.rb', line 13

def initialize
  super
  require 'cassandra'
  require 'msgpack'
end

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


19
20
21
22
23
24
25
26
27
# File 'lib/fluent/plugin/out_cassandra.rb', line 19

def configure(conf)
  super

  raise ConfigError, "'Keyspace' parameter is required on cassandra output"   unless @keyspace = conf['keyspace']
  raise ConfigError, "'ColumnFamily' parameter is required on cassandra output"   unless @columnfamily = conf['columnfamily']

  @host = conf.has_key?('host') ? conf['host'] : 'localhost'
  @port = conf.has_key?('port') ? conf['port'] : 9160
end

#format(tag, time, record) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/out_cassandra.rb', line 38

def format(tag, time,record)
  record.to_msgpack
end

#shutdownObject



34
35
36
# File 'lib/fluent/plugin/out_cassandra.rb', line 34

def shutdown
  super
end

#startObject



29
30
31
32
# File 'lib/fluent/plugin/out_cassandra.rb', line 29

def start
  super
  @connection = Cassandra.new(@keyspace, @host + ':' + @port )
end

#write(chunk) ⇒ Object



42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/out_cassandra.rb', line 42

def write(chunk)
  chunk.msgpack_each  { |record|
          @connection.insert(
                  @columnfamily,
                  record["tag"] + "_" + record["time"].to_s,
                  record
                  )
  }
end