Class: Fluent::CassandraCqlOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_cassandra_cql.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 26

def configure(conf)
  super

  # perform validations
  raise ConfigError, "'Host' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.host.nil?
  raise ConfigError, "'Port' is required by Cassandra output (ex: 9160)" if self.port.nil?
  raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil?
  raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil?
  raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil?
  raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2
  raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil?

  # convert schema from string to hash
  # NOTE: ok to use eval b/c this isn't this isn't a user
  #       supplied string
  self.schema = eval(self.schema)

  # convert data keys from string to array
  self.data_keys = self.data_keys.split(',')
end

#connectionObject



22
23
24
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 22

def connection
  @connection ||= get_connection(self.host, self.port, self.keyspace)
end

#format(tag, time, record) ⇒ Object



56
57
58
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 56

def format(tag, time, record)
  record.to_msgpack
end

#shutdownObject



52
53
54
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 52

def shutdown
  super
end

#startObject



47
48
49
50
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 47

def start
  super
  connection
end

#write(chunk) ⇒ Object



60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 60

def write(chunk)
  chunk.msgpack_each  { |record|
    values = build_insert_values_string(self.schema.keys, self.data_keys, record, self.pop_data_keys)
    cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) " +
                        "VALUES (#{values}) " +
                        "USING TTL #{self.ttl}"
    @connection.execute(cql)
  }
end