Class: Fluent::CassandraCqlOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CassandraCqlOutput
- Defined in:
- lib/fluent/plugin/out_cassandra_cql.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #connection ⇒ Object
- #format(tag, time, record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 26 def configure(conf) super # perform validations raise ConfigError, "'Host' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.host.nil? raise ConfigError, "'Port' is required by Cassandra output (ex: 9160)" if self.port.nil? raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil? raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil? raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil? raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2 raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil? # convert schema from string to hash # NOTE: ok to use eval b/c this isn't this isn't a user # supplied string self.schema = eval(self.schema) # convert data keys from string to array self.data_keys = self.data_keys.split(',') end |
#connection ⇒ Object
22 23 24 |
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 22 def connection @connection ||= get_connection(self.host, self.port, self.keyspace) end |
#format(tag, time, record) ⇒ Object
56 57 58 |
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 56 def format(tag, time, record) record.to_msgpack end |
#shutdown ⇒ Object
52 53 54 |
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 52 def shutdown super end |
#start ⇒ Object
47 48 49 50 |
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 47 def start super connection end |
#write(chunk) ⇒ Object
60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_cassandra_cql.rb', line 60 def write(chunk) chunk.msgpack_each { |record| values = build_insert_values_string(self.schema.keys, self.data_keys, record, self.pop_data_keys) cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) " + "VALUES (#{values}) " + "USING TTL #{self.ttl}" @connection.execute(cql) } end |