Class: NoSE::Backend::CassandraBackend
- Includes:
- Subtype
- Defined in:
- lib/nose/backend/cassandra.rb
Overview
A backend which communicates with Cassandra via CQL
Defined Under Namespace
Classes: DeleteStatementStep, IndexLookupStatementStep, InsertStatementStep
Instance Method Summary collapse
-
#drop_index(index) ⇒ Object
Check if a given index exists in the target database.
-
#generate_id ⇒ Object
Generate a random UUID.
-
#index_empty?(index) ⇒ Boolean
Check if the given index is empty.
-
#index_exists?(index) ⇒ Boolean
Check if a given index exists in the target database.
-
#index_insert_chunk(index, chunk) ⇒ Array<Array<Cassandra::Uuid>>
Insert a chunk of rows into an index.
-
#index_sample(index, count) ⇒ Object
Sample a number of values from the given index.
-
#indexes_ddl(execute = false, skip_existing = false, drop_existing = false) ⇒ Object
Produce the DDL necessary for column families for the given indexes and optionally execute them against the server.
-
#initialize(model, indexes, plans, update_plans, config) ⇒ CassandraBackend
constructor
A new instance of CassandraBackend.
Methods included from Subtype
Methods inherited from Backend
#by_id_graph, #indexes_sample, #prepare, #prepare_query, #prepare_update, #query, #update
Methods included from Supertype
Methods included from Listing
Constructor Details
#initialize(model, indexes, plans, update_plans, config) ⇒ CassandraBackend
Returns a new instance of CassandraBackend.
12 13 14 15 16 17 18 19 |
# File 'lib/nose/backend/cassandra.rb', line 12 def initialize(model, indexes, plans, update_plans, config) super @hosts = config[:hosts] @port = config[:port] @keyspace = config[:keyspace] @generator = Cassandra::Uuid::Generator.new end |
Instance Method Details
#drop_index(index) ⇒ Object
Check if a given index exists in the target database
85 86 87 |
# File 'lib/nose/backend/cassandra.rb', line 85 def drop_index(index) client.execute "DROP TABLE \"#{index.key}\"" end |
#generate_id ⇒ Object
Generate a random UUID
22 23 24 |
# File 'lib/nose/backend/cassandra.rb', line 22 def generate_id @generator.uuid end |
#index_empty?(index) ⇒ Boolean
Check if the given index is empty
73 74 75 76 |
# File 'lib/nose/backend/cassandra.rb', line 73 def index_empty?(index) query = "SELECT COUNT(*) FROM \"#{index.key}\" LIMIT 1" client.execute(query).first.values.first.zero? end |
#index_exists?(index) ⇒ Boolean
Check if a given index exists in the target database
79 80 81 82 |
# File 'lib/nose/backend/cassandra.rb', line 79 def index_exists?(index) client @cluster.keyspace(@keyspace).has_table? index.key end |
#index_insert_chunk(index, chunk) ⇒ Array<Array<Cassandra::Uuid>>
Insert a chunk of rows into an index
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/nose/backend/cassandra.rb', line 51 def index_insert_chunk(index, chunk) fields = index.all_fields.to_a prepared = "INSERT INTO \"#{index.key}\" (" \ "#{field_names fields}" \ ") VALUES (#{(['?'] * fields.length).join ', '})" prepared = client.prepare prepared ids = [] client.execute(client.batch do |batch| chunk.each do |row| index_row = index_row(row, fields) ids << (index.hash_fields.to_a + index.order_fields).map do |field| index_row[fields.index field] end batch.add prepared, arguments: index_row end end) ids end |
#index_sample(index, count) ⇒ Object
Sample a number of values from the given index
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/nose/backend/cassandra.rb', line 90 def index_sample(index, count) field_list = index.all_fields.map { |f| "\"#{f.id}\"" } query = "SELECT #{field_list.join ', '} " \ "FROM \"#{index.key}\" LIMIT #{count}" rows = client.execute(query).rows # XXX Ignore null values for now # fail if rows.any? { |row| row.values.any?(&:nil?) } rows end |
#indexes_ddl(execute = false, skip_existing = false, drop_existing = false) ⇒ Object
Produce the DDL necessary for column families for the given indexes and optionally execute them against the server
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/nose/backend/cassandra.rb', line 28 def indexes_ddl(execute = false, skip_existing = false, drop_existing = false) Enumerator.new do |enum| @indexes.map do |index| ddl = index_cql index enum.yield ddl begin drop_index(index) if drop_existing && index_exists?(index) client.execute(ddl) if execute rescue Cassandra::Errors::AlreadyExistsError => exc next if skip_existing new_exc = IndexAlreadyExists.new exc. new_exc.set_backtrace exc.backtrace raise new_exc end end end end |