Class: Cassandra
- Inherits:
-
Object
- Object
- Cassandra
- Defined in:
- lib/cassandra/cassandra.rb,
lib/cassandra.rb,
lib/cassandra/0.6.rb,
lib/cassandra/0.7.rb,
lib/cassandra/0.8.rb,
lib/cassandra/1.0.rb,
lib/cassandra/1.1.rb,
lib/cassandra/long.rb,
lib/cassandra/batch.rb,
lib/cassandra/columns.rb,
lib/cassandra/helpers.rb,
lib/cassandra/keyspace.rb,
lib/cassandra/protocol.rb,
lib/cassandra/composite.rb,
lib/cassandra/constants.rb,
lib/cassandra/comparable.rb,
lib/cassandra/0.6/columns.rb,
lib/cassandra/0.7/columns.rb,
lib/cassandra/0.8/columns.rb,
lib/cassandra/0.6/protocol.rb,
lib/cassandra/0.7/protocol.rb,
lib/cassandra/0.8/protocol.rb,
lib/cassandra/ordered_hash.rb,
lib/cassandra/0.6/cassandra.rb,
lib/cassandra/0.7/cassandra.rb,
lib/cassandra/0.8/cassandra.rb,
lib/cassandra/column_family.rb,
lib/cassandra/dynamic_composite.rb,
lib/cassandra/mock.rb
Overview
OrderedHash is namespaced to prevent conflicts with other implementations
Defined Under Namespace
Modules: Columns, Consistency, Constants, Helpers, Protocol Classes: AccessError, Batch, ColumnFamily, Comparable, Composite, DynamicComposite, Keyspace, Long, Mock, OrderedHash, OrderedHashInt
Constant Summary collapse
- WRITE_DEFAULTS =
{ :count => 1000, :timestamp => nil, :consistency => Consistency::ONE, :ttl => nil }
- READ_DEFAULTS =
{ :count => 100, :start => nil, :finish => nil, :reversed => false, :consistency => Consistency::ONE }
- THRIFT_DEFAULTS =
{ :transport_wrapper => Thrift::FramedTransport, :thrift_client_class => ThriftClient }
Instance Attribute Summary collapse
-
#auth_request ⇒ Object
readonly
Returns the value of attribute auth_request.
-
#keyspace ⇒ Object
Returns the value of attribute keyspace.
-
#servers ⇒ Object
readonly
Returns the value of attribute servers.
-
#thrift_client_class ⇒ Object
readonly
Returns the value of attribute thrift_client_class.
-
#thrift_client_options ⇒ Object
readonly
Returns the value of attribute thrift_client_options.
Class Method Summary collapse
Instance Method Summary collapse
- #_run_checkin_callbacks ⇒ Object
-
#add(column_family, key, value, *columns_and_options) ⇒ Object
Add a value to the counter in cf:key:super column:column.
-
#add_column_family(cf_def) ⇒ Object
Creates a new column family from the passed in Cassandra::ColumnFamily instance, and returns the schema id.
-
#add_keyspace(ks_def) ⇒ Object
Add keyspace using the passed in keyspace definition.
-
#add_multiple_columns(column_family, key, hash, options = {}) ⇒ Object
Increment one or more counters in a single row.
-
#batch(options = {}) ⇒ Object
Open a batch operation and yield self.
-
#clear_keyspace!(options = {}) ⇒ Object
Remove all rows in the keyspace.
-
#cluster_name ⇒ Object
Returns the string name specified for the cluster.
-
#column_families ⇒ Object
Return a hash of column_family definitions indexed by their names.
- #connected? ⇒ Boolean
-
#count_columns(column_family, key, *columns_and_options) ⇒ Object
Count the columns for the provided parameters.
-
#count_range(column_family, options = {}) ⇒ Object
Count all rows in the column_family you request.
-
#create_index(keyspace, column_family, column_name, validation_class) ⇒ Object
Create secondary index.
-
#create_index_clause(index_expressions, start = "", count = 100) ⇒ Object
(also: #create_idx_clause)
This method takes an array if CassandraThrift::IndexExpression objects and creates a CassandraThrift::IndexClause for use in the Cassandra#get_index_slices.
-
#create_index_expression(column_name, value, comparison) ⇒ Object
(also: #create_idx_expr)
This method is mostly used internally by get_index_slices to create a CassandraThrift::IndexExpression for the given options.
-
#default_read_consistency=(value) ⇒ Object
The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value.
-
#default_write_consistency=(value) ⇒ Object
The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value.
-
#disable_node_auto_discovery! ⇒ Object
This is primarily helpful when the cassandra cluster is communicating internally on a different ip address than what you are using to connect.
-
#disconnect! ⇒ Object
Disconnect the current client connection.
-
#drop_column_family(column_family) ⇒ Object
Delete the specified column family.
-
#drop_index(keyspace, column_family, column_name) ⇒ Object
Delete secondary index.
-
#drop_keyspace(keyspace = @keyspace) ⇒ Object
Deletes keyspace using the passed in keyspace name.
-
#each(column_family, options = {}) ⇒ Object
Iterate through each row in the given column family.
-
#each_key(column_family, options = {}) ⇒ Object
Iterate through each key within the given parameters.
-
#exists?(column_family, key, *columns_and_options) ⇒ Boolean
Return true if the column_family:key::[sub_column] path you request exists.
-
#flush_batch(options) ⇒ Object
Send the batch queue to the server.
-
#get(column_family, key, *columns_and_options) ⇒ Object
Return a hash (actually, a Cassandra::OrderedHash) or a single value representing the element at the column_family:key::[sub_column] path you request.
-
#get_columns(column_family, key, *columns_and_options) ⇒ Object
Return a hash of column value pairs for the path you request.
-
#get_columns_as_hash(column_family, key, columns, options = nil) ⇒ Object
Return a hash of column value pairs for the path you request.
-
#get_indexed_slices(column_family, index_clause, *columns_and_options) ⇒ Object
This method is used to query a secondary index with a set of provided search parameters.
-
#get_range(column_family, options = {}, &blk) ⇒ Object
Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.
-
#get_range_batch(column_family, options = {}) ⇒ Object
Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.
-
#get_range_keys(column_family, options = {}) ⇒ Object
Return an Array containing all of the keys within a given range.
-
#get_range_single(column_family, options = {}) ⇒ Object
Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.
-
#get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts = nil) ⇒ Object
Selecting a slice of a column.
-
#get_value(column_family, key, column, consistency) ⇒ Object
Returns the value of the column.
-
#initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {}) ⇒ Cassandra
constructor
Create a new Cassandra instance and open the connection.
-
#insert(column_family, key, hash, options = {}) ⇒ Object
This is the main method used to insert rows into cassandra.
- #inspect ⇒ Object
-
#keyspaces ⇒ Object
Returns an array of available keyspaces.
-
#login!(username, password) ⇒ Object
Issues a login attempt using the username and password specified.
-
#multi_count_columns(column_family, keys, *options) ⇒ Object
Multi-key version of Cassandra#count_columns.
-
#multi_get(column_family, keys, *columns_and_options) ⇒ Object
Multi-key version of Cassandra#get.
-
#multi_get_columns(column_family, keys, *columns_and_options) ⇒ Object
Multi-key version of Cassandra#get_columns.
-
#partitioner ⇒ Object
Returns a string identifying which partitioner is in use by the current cluster.
-
#remove(column_family, key, *columns_and_options) ⇒ Object
This method is used to delete (actually marking them as deleted with a tombstone) rows, columns, or super columns depending on the parameters passed.
-
#rename_column_family(old_name, new_name) ⇒ Object
Rename a column family.
-
#rename_keyspace(old_name, new_name) ⇒ Object
Renames keyspace.
- #requires_reloading? ⇒ Boolean
-
#ring ⇒ Object
Returns an array of CassandraThrift::TokenRange objects indicating which servers make up the current ring.
-
#run_callbacks(method) ⇒ Object
method required by ActiveRecord::ConnectionAdapters::ConnectionPool.
-
#schema_agreement? ⇒ Boolean
This returns true if all servers are in agreement on the schema.
-
#update_column_family(cf_def) ⇒ Object
Update the column family based on the passed in definition.
-
#update_keyspace(ks_def) ⇒ Object
Update the keyspace using the passed in keyspace definition.
- #verify! ⇒ Object
-
#version ⇒ Object
Lists the current cassandra.thrift version.
Methods included from Helpers
#extract_and_validate_params, #s_map
Methods included from Columns
#_standard_counter_mutation, #_super_counter_mutation
Constructor Details
#initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {}) ⇒ Cassandra
Create a new Cassandra instance and open the connection.
76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/cassandra/cassandra.rb', line 76 def initialize(keyspace, servers = "127.0.0.1:9160", = {}) @is_super = {} @column_name_class = {} @sub_column_name_class = {} @column_name_maker = {} @sub_column_name_maker = {} @auto_discover_nodes = true [:transport_wrapper] ||= Cassandra.DEFAULT_TRANSPORT_WRAPPER @thrift_client_options = THRIFT_DEFAULTS.merge() @thrift_client_class = @thrift_client_options[:thrift_client_class] @keyspace = keyspace @servers = Array(servers) end |
Instance Attribute Details
#auth_request ⇒ Object (readonly)
Returns the value of attribute auth_request.
69 70 71 |
# File 'lib/cassandra/cassandra.rb', line 69 def auth_request @auth_request end |
#keyspace ⇒ Object
Returns the value of attribute keyspace.
69 70 71 |
# File 'lib/cassandra/cassandra.rb', line 69 def keyspace @keyspace end |
#servers ⇒ Object (readonly)
Returns the value of attribute servers.
69 70 71 |
# File 'lib/cassandra/cassandra.rb', line 69 def servers @servers end |
#thrift_client_class ⇒ Object (readonly)
Returns the value of attribute thrift_client_class.
69 70 71 |
# File 'lib/cassandra/cassandra.rb', line 69 def thrift_client_class @thrift_client_class end |
#thrift_client_options ⇒ Object (readonly)
Returns the value of attribute thrift_client_options.
69 70 71 |
# File 'lib/cassandra/cassandra.rb', line 69 def @thrift_client_options end |
Class Method Details
.DEFAULT_TRANSPORT_WRAPPER ⇒ Object
71 72 73 |
# File 'lib/cassandra/cassandra.rb', line 71 def self.DEFAULT_TRANSPORT_WRAPPER Thrift::FramedTransport end |
.VERSION ⇒ Object
2 3 4 |
# File 'lib/cassandra/0.6.rb', line 2 def self.VERSION "0.6" end |
Instance Method Details
#_run_checkin_callbacks ⇒ Object
1142 1143 1144 |
# File 'lib/cassandra/cassandra.rb', line 1142 def _run_checkin_callbacks yield if block_given? end |
#add(column_family, key, value, *columns_and_options) ⇒ Object
Add a value to the counter in cf:key:super column:column
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/cassandra/0.8/cassandra.rb', line 6 def add(column_family, key, value, *) column_family, column, sub_column, = extract_and_validate_params(column_family, key, , WRITE_DEFAULTS) mutation_map = if is_super(column_family) { key => { column_family => [_super_counter_mutation(column_family, column, sub_column, value)] } } else { key => { column_family => [_standard_counter_mutation(column_family, column, value)] } } end @batch ? @batch << [mutation_map, [:consistency]] : _mutate(mutation_map, [:consistency]) end |
#add_column_family(cf_def) ⇒ Object
Creates a new column family from the passed in Cassandra::ColumnFamily instance, and returns the schema id.
270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/cassandra/cassandra.rb', line 270 def add_column_family(cf_def) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_add_column_family(cf_def) rescue CassandraThrift::TimedOutException => te puts "Timed out: #{te.inspect}" end @schema = nil res end |
#add_keyspace(ks_def) ⇒ Object
Add keyspace using the passed in keyspace definition.
Returns the new schema id.
337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/cassandra/cassandra.rb', line 337 def add_keyspace(ks_def) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_add_keyspace(ks_def) rescue CassandraThrift::TimedOutException => toe puts "Timed out: #{toe.inspect}" rescue Thrift::TransportException => te puts "Timed out: #{te.inspect}" end @keyspaces = nil res end |
#add_multiple_columns(column_family, key, hash, options = {}) ⇒ Object
Increment one or more counters in a single row.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/cassandra/0.8/cassandra.rb', line 27 def add_multiple_columns(column_family, key, hash, = {}) column_family, _, _, = extract_and_validate_params(column_family, key, [], WRITE_DEFAULTS) mutation_map = if is_super(column_family) { key => { column_family => hash.collect do |column, sub_hash| sub_hash.collect do |sub_column, value| _super_counter_mutation(column_family, column, sub_column, value) end end.flatten } } else { key => { column_family => hash.collect { |column, value| _standard_counter_mutation(column_family, column, value) } } } end @batch ? @batch << [mutation_map, [:consistency]] : _mutate(mutation_map, [:consistency]) end |
#batch(options = {}) ⇒ Object
Open a batch operation and yield self. Inserts and deletes will be queued until the block closes, and then sent atomically to the server. Supports the :consistency
option, which overrides the consistency set in the individual commands.
888 889 890 891 892 893 894 895 896 897 898 |
# File 'lib/cassandra/cassandra.rb', line 888 def batch( = {}) @batch = Cassandra::Batch.new(self, ) _, _, _, = extract_and_validate_params(schema.cf_defs.first.name, "", [], WRITE_DEFAULTS) yield(self) flush_batch() ensure @batch = nil end |
#clear_keyspace!(options = {}) ⇒ Object
Remove all rows in the keyspace. Supports options :consistency
and :timestamp
. FIXME May not currently delete all records without multiple calls. Waiting for ranged remove support in Cassandra.
260 261 262 263 264 |
# File 'lib/cassandra/cassandra.rb', line 260 def clear_keyspace! return false if Cassandra.VERSION.to_f < 0.7 schema.cf_defs.each { |cfdef| truncate!(cfdef.name) } end |
#cluster_name ⇒ Object
Returns the string name specified for the cluster.
Please note that this only works on version 0.7.0 and higher.
209 210 211 212 213 |
# File 'lib/cassandra/cassandra.rb', line 209 def cluster_name return false if Cassandra.VERSION.to_f < 0.7 @cluster_name ||= client.describe_cluster_name() end |
#column_families ⇒ Object
Return a hash of column_family definitions indexed by their names
164 165 166 167 168 |
# File 'lib/cassandra/cassandra.rb', line 164 def column_families return false if Cassandra.VERSION.to_f < 0.7 schema.cf_defs.inject(Hash.new){|memo, cf_def| memo[cf_def.name] = cf_def; memo;} end |
#connected? ⇒ Boolean
1150 1151 1152 |
# File 'lib/cassandra/cassandra.rb', line 1150 def connected? @client && @client.current_server end |
#count_columns(column_family, key, *columns_and_options) ⇒ Object
Count the columns for the provided parameters.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:start - The column name to start from.
-
:stop - The column name to stop at.
-
:count - The maximum count of columns to return. (By default cassandra will count up to 100 columns)
-
:consistency - Uses the default read consistency if none specified.
-
520 521 522 523 524 |
# File 'lib/cassandra/cassandra.rb', line 520 def count_columns(column_family, key, *) column_family, super_column, _, = extract_and_validate_params(column_family, key, , READ_DEFAULTS) _count_columns(column_family, key, super_column, [:start], [:stop], [:count], [:consistency]) end |
#count_range(column_family, options = {}) ⇒ Object
Count all rows in the column_family you request.
This method just calls Cassandra#get_range_keys and returns the number of records returned.
See Cassandra#get_range for options.
836 837 838 |
# File 'lib/cassandra/cassandra.rb', line 836 def count_range(column_family, = {}) get_range_keys(column_family, ).length end |
#create_index(keyspace, column_family, column_name, validation_class) ⇒ Object
Create secondary index.
-
keyspace
-
column_family
-
column_name
-
validation_class
925 926 927 928 929 930 931 932 933 934 935 936 937 938 |
# File 'lib/cassandra/cassandra.rb', line 925 def create_index(keyspace, column_family, column_name, validation_class) return false if Cassandra.VERSION.to_f < 0.7 cf_def = client.describe_keyspace(keyspace).cf_defs.find{|x| x.name == column_family} if !cf_def.nil? and !cf_def..find{|x| x.name == column_name} c_def = CassandraThrift::ColumnDef.new do |cd| cd.name = column_name cd.validation_class = "org.apache.cassandra.db.marshal."+validation_class cd.index_type = CassandraThrift::IndexType::KEYS end cf_def..push(c_def) update_column_family(cf_def) end end |
#create_index_clause(index_expressions, start = "", count = 100) ⇒ Object Also known as: create_idx_clause
This method takes an array if CassandraThrift::IndexExpression objects and creates a CassandraThrift::IndexClause for use in the Cassandra#get_index_slices
-
index_expressions - Array of CassandraThrift::IndexExpressions.
-
start - The starting row key.
-
count - The count of items to be returned
995 996 997 998 999 1000 1001 1002 |
# File 'lib/cassandra/cassandra.rb', line 995 def create_index_clause(index_expressions, start = "", count = 100) return false if Cassandra.VERSION.to_f < 0.7 CassandraThrift::IndexClause.new( :start_key => start, :expressions => index_expressions, :count => count) end |
#create_index_expression(column_name, value, comparison) ⇒ Object Also known as: create_idx_expr
This method is mostly used internally by get_index_slices to create a CassandraThrift::IndexExpression for the given options.
-
column_name - Column to be compared
-
value - Value to compare against
-
comparison - Type of comparison to do.
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 |
# File 'lib/cassandra/cassandra.rb', line 965 def create_index_expression(column_name, value, comparison) return false if Cassandra.VERSION.to_f < 0.7 CassandraThrift::IndexExpression.new( :column_name => column_name, :value => value, :op => (case comparison when nil, "EQ", "eq", "==" CassandraThrift::IndexOperator::EQ when "GTE", "gte", ">=" CassandraThrift::IndexOperator::GTE when "GT", "gt", ">" CassandraThrift::IndexOperator::GT when "LTE", "lte", "<=" CassandraThrift::IndexOperator::LTE when "LT", "lt", "<" CassandraThrift::IndexOperator::LT end )) end |
#default_read_consistency=(value) ⇒ Object
The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value. Use this if you do not want to specify a read consistency for each query.
423 424 425 |
# File 'lib/cassandra/cassandra.rb', line 423 def default_read_consistency=(value) READ_DEFAULTS[:consistency] = value end |
#default_write_consistency=(value) ⇒ Object
The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value. Use this if you do not want to specify a write consistency for each insert statement.
414 415 416 |
# File 'lib/cassandra/cassandra.rb', line 414 def default_write_consistency=(value) WRITE_DEFAULTS[:consistency] = value end |
#disable_node_auto_discovery! ⇒ Object
This is primarily helpful when the cassandra cluster is communicating internally on a different ip address than what you are using to connect. A prime example of this would be when using EC2 to host a cluster. Typically, the cluster would be communicating over the local ip addresses issued by Amazon, but any clients connecting from outside EC2 would need to use the public ip.
102 103 104 |
# File 'lib/cassandra/cassandra.rb', line 102 def disable_node_auto_discovery! @auto_discover_nodes = false end |
#disconnect! ⇒ Object
Disconnect the current client connection.
109 110 111 112 113 114 |
# File 'lib/cassandra/cassandra.rb', line 109 def disconnect! if @client @client.disconnect! @client = nil end end |
#drop_column_family(column_family) ⇒ Object
Delete the specified column family. Return the new schema id.
-
column_family - The column_family name to drop.
287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/cassandra/cassandra.rb', line 287 def drop_column_family(column_family) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_drop_column_family(column_family) rescue CassandraThrift::TimedOutException => te puts "Timed out: #{te.inspect}" end @schema = nil res end |
#drop_index(keyspace, column_family, column_name) ⇒ Object
Delete secondary index.
-
keyspace
-
column_family
-
column_name
947 948 949 950 951 952 953 954 955 |
# File 'lib/cassandra/cassandra.rb', line 947 def drop_index(keyspace, column_family, column_name) return false if Cassandra.VERSION.to_f < 0.7 cf_def = client.describe_keyspace(keyspace).cf_defs.find{|x| x.name == column_family} if !cf_def.nil? and cf_def..find{|x| x.name == column_name} cf_def..delete_if{|x| x.name == column_name} update_column_family(cf_def) end end |
#drop_keyspace(keyspace = @keyspace) ⇒ Object
Deletes keyspace using the passed in keyspace name.
Returns the new schema id.
356 357 358 359 360 361 362 363 364 365 366 367 368 369 |
# File 'lib/cassandra/cassandra.rb', line 356 def drop_keyspace(keyspace=@keyspace) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_drop_keyspace(keyspace) rescue CassandraThrift::TimedOutException => toe puts "Timed out: #{toe.inspect}" rescue Thrift::TransportException => te puts "Timed out: #{te.inspect}" end keyspace = "system" if keyspace.eql?(@keyspace) @keyspaces = nil res end |
#each(column_family, options = {}) ⇒ Object
Iterate through each row in the given column family
This method just calls Cassandra#get_range and yields the key and columns.
See Cassandra#get_range for options.
874 875 876 877 878 |
# File 'lib/cassandra/cassandra.rb', line 874 def each(column_family, = {}) get_range_batch(column_family, ) do |key, columns| yield key, columns end end |
#each_key(column_family, options = {}) ⇒ Object
Iterate through each key within the given parameters. This function can be used to iterate over each key in the given column family.
This method just calls Cassandra#get_range and yields each row key.
See Cassandra#get_range for options.
860 861 862 863 864 |
# File 'lib/cassandra/cassandra.rb', line 860 def each_key(column_family, = {}) get_range_batch(column_family, ) do |key, columns| yield key end end |
#exists?(column_family, key, *columns_and_options) ⇒ Boolean
Return true if the column_family:key::[sub_column] path you request exists.
If passed in only a row key it will query for any columns (limiting to 1) for that row key. If a column is passed in it will query for that specific column/super column.
This method will return true or false.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
688 689 690 691 692 693 694 695 696 697 698 |
# File 'lib/cassandra/cassandra.rb', line 688 def exists?(column_family, key, *) column_family, column, sub_column, = extract_and_validate_params(column_family, key, , READ_DEFAULTS) result = if column _multiget(column_family, [key], column, sub_column, 1, '', '', false, [:consistency])[key] else _multiget(column_family, [key], nil, nil, 1, '', '', false, [:consistency])[key] end ![{}, nil].include?(result) end |
#flush_batch(options) ⇒ Object
Send the batch queue to the server
903 904 905 906 907 908 909 910 911 912 913 914 915 |
# File 'lib/cassandra/cassandra.rb', line 903 def flush_batch() compacted_map,seen_clevels = compact_mutations! clevel = if [:consistency] != nil # Override any clevel from individual mutations if [:consistency] elsif seen_clevels.length > 1 # Cannot choose which CLevel to use if there are several ones raise "Multiple consistency levels used in the batch, and no override...cannot pick one" else # if no consistency override has been provided but all the clevels in the batch are the same: use that one seen_clevels.first end _mutate(compacted_map,clevel) end |
#get(column_family, key, *columns_and_options) ⇒ Object
Return a hash (actually, a Cassandra::OrderedHash) or a single value representing the element at the column_family:key::[sub_column] path you request.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
column - Either a single super_column or single column.
-
sub_column - A single sub_column to select.
-
options - Valid options are:
-
:count - The number of columns requested to be returned.
-
:start - The starting value for selecting a range of columns.
-
:finish - The final value for selecting a range of columns.
-
:reversed - If set to true the results will be returned in
reverse order.
-
:consistency - Uses the default read consistency if none specified.
-
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
# File 'lib/cassandra/cassandra.rb', line 615 def get(column_family, key, *) if column_family.is_a?(CassandraThrift::ColumnParent) column = .first consistency = .last.is_a?(Hash) ? (.last[:consistency] || READ_DEFAULTS[:consistency]) : READ_DEFAULTS[:consistency] get_slice(column_family, key, column, nil, nil, nil, nil, consistency) else column_family, column, sub_column, = extract_and_validate_params(column_family, key, , READ_DEFAULTS) if column get_value column_family, key, column, [:consistency] else get_slice(column_family, key, column, [:start], [:finish], [:count], [:reversed], [:consistency]) end end end |
#get_columns(column_family, key, *columns_and_options) ⇒ Object
Return a hash of column value pairs for the path you request.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
554 555 556 557 558 |
# File 'lib/cassandra/cassandra.rb', line 554 def get_columns(column_family, key, *) column_family, columns, sub_columns, = extract_and_validate_params(column_family, key, , READ_DEFAULTS) _get_columns(column_family, key, columns, sub_columns, [:consistency]) end |
#get_columns_as_hash(column_family, key, columns, options = nil) ⇒ Object
Return a hash of column value pairs for the path you request.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
570 571 572 573 574 575 576 577 |
# File 'lib/cassandra/cassandra.rb', line 570 def get_columns_as_hash(column_family, key, columns, =nil) consistency = ? ([:consistency] || READ_DEFAULTS[:consistency]) : READ_DEFAULTS[:consistency] if column_family.is_a?(CassandraThrift::ColumnParent) _get_columns_as_hash(column_family, key, columns, nil, consistency) else _get_columns_as_hash(column_family, key, columns, nil, consistency) end end |
#get_indexed_slices(column_family, index_clause, *columns_and_options) ⇒ Object
This method is used to query a secondary index with a set of provided search parameters.
Please note that you can either specify a CassandraThrift::IndexClause or an array of hashes with the format as below.
-
column_family - The Column Family this operation will be run on.
-
index_clause - This can either be a CassandraThrift::IndexClause or an array of hashes with the following keys:
-
:column_name - Column to be compared
-
:value - Value to compare against
-
:comparison - Type of comparison to do.
-
-
options
-
:key_count - Set maximum number of rows to return. (Only works if CassandraThrift::IndexClause is not passed in.)
-
:start_key - Set starting row key for search. (Only works if CassandraThrift::IndexClause is not passed in.)
-
:consistency
-
TODO: Supercolumn support.
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 |
# File 'lib/cassandra/cassandra.rb', line 1024 def get_indexed_slices(column_family, index_clause, *) return false if Cassandra.VERSION.to_f < 0.7 column_family, columns, _, = extract_and_validate_params(column_family, [], , READ_DEFAULTS.merge(:key_count => 100, :start_key => nil, :key_start => nil)) start_key = [:start_key] || [:key_start] || "" if index_clause.class != CassandraThrift::IndexClause index_expressions = index_clause.collect do |expression| create_index_expression(expression[:column_name], expression[:value], expression[:comparison]) end index_clause = create_index_clause(index_expressions, start_key, [:key_count]) end key_slices = _get_indexed_slices(column_family, index_clause, columns, [:count], [:start], [:finish], [:reversed], [:consistency]) key_slices.inject(OrderedHash.new) {|h, key_slice| h[key_slice.key] = key_slice.columns; h } end |
#get_range(column_family, options = {}, &blk) ⇒ Object
Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.
This method is just a convenience wrapper around Cassandra#get_range_single and Cassandra#get_range_batch. If :key_size, :batch_size, or a block is passed in Cassandra#get_range_batch will be called. Otherwise Cassandra#get_range_single will be used.
The start_key and finish_key parameters are only useful for iterating of all records as is done in the Cassandra#each and Cassandra#each_key methods if you are using the RandomPartitioner.
If the table is partitioned with OrderPreservingPartitioner you may use the start_key and finish_key params to select all records with the same prefix value.
If a block is passed in we will yield the row key and columns for each record returned.
Please note that Cassandra returns a row for each row that has existed in the system since gc_grace_seconds. This is because deleted row keys are marked as deleted, but left in the system until the cluster has had resonable time to replicate the deletion. This function attempts to suppress deleted rows (actually any row returned without columns is suppressed).
Please note that when enabling the :reversed option, :start and :finish should be swapped (e.g. reversal happens before selecting the range).
-
column_family - The column_family that you are inserting into.
-
options - Valid options are:
-
:start_key - The starting value for selecting a range of keys (only useful with OPP).
-
:finish_key - The final value for selecting a range of keys (only useful with OPP).
-
:key_count - The total number of keys to return from the query. (see note regarding deleted records)
-
:batch_size - The maximum number of keys to return per query. If specified will loop until :key_count is obtained or all records have been returned.
-
:columns - A list of columns to return.
-
:count - The number of columns requested to be returned.
-
:start - The starting value for selecting a range of columns.
-
:finish - The final value for selecting a range of columns.
-
:reversed - If set to true the results will be returned in reverse order.
-
:consistency - Uses the default read consistency if none specified.
-
742 743 744 745 746 747 748 |
# File 'lib/cassandra/cassandra.rb', line 742 def get_range(column_family, = {}, &blk) if block_given? || [:key_count] || [:batch_size] get_range_batch(column_family, , &blk) else get_range_single(column_family, , &blk) end end |
#get_range_batch(column_family, options = {}) ⇒ Object
Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.
If a block is passed in we will yield the row key and columns for each record returned and return a nil value instead of a Cassandra::OrderedHash.
See Cassandra#get_range for more details.
792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 |
# File 'lib/cassandra/cassandra.rb', line 792 def get_range_batch(column_family, = {}) batch_size = .delete(:batch_size) || 100 count = .delete(:key_count) result = (!block_given? && {}) || nil num_results = 0 [:start_key] ||= '' last_key = nil while count.nil? || count > num_results res = get_range_single(column_family, .merge!(:start_key => last_key || [:start_key], :key_count => batch_size, :return_empty_rows => true )) break if res.keys.last == last_key res.each do |key, columns| next if last_key == key next if num_results == count unless columns == {} if block_given? yield key, columns else result[key] = columns end num_results += 1 end last_key = key end end result end |
#get_range_keys(column_family, options = {}) ⇒ Object
Return an Array containing all of the keys within a given range.
This method just calls Cassandra#get_range and returns the row keys for the records returned.
See Cassandra#get_range for options.
848 849 850 |
# File 'lib/cassandra/cassandra.rb', line 848 def get_range_keys(column_family, = {}) get_range(column_family,.merge!(:count => 1)).keys end |
#get_range_single(column_family, options = {}) ⇒ Object
Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.
See Cassandra#get_range for more details.
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 |
# File 'lib/cassandra/cassandra.rb', line 756 def get_range_single(column_family, = {}) return_empty_rows = .delete(:return_empty_rows) || false column_family, _, _, = extract_and_validate_params(column_family, "", [], READ_DEFAULTS.merge(:start_key => '', :finish_key => '', :key_count => 100, :columns => nil, :reversed => false ) ) results = _get_range( column_family, [:start_key].to_s, [:finish_key].to_s, [:key_count], [:columns], [:start].to_s, [:finish].to_s, [:count], [:consistency], [:reversed] ) multi_key_slices_to_hash(column_family, results, return_empty_rows) end |
#get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts = nil) ⇒ Object
Selecting a slice of a column
1048 1049 1050 |
# File 'lib/cassandra/cassandra.rb', line 1048 def get_slice(column_family, key, column, start, finish, count, reversed, consistency, opts=nil) _get_slice(column_family, key, column, start, finish, count, reversed, consistency) end |
#get_value(column_family, key, column, consistency) ⇒ Object
Returns the value of the column
635 636 637 |
# File 'lib/cassandra/cassandra.rb', line 635 def get_value(column_family, key, column, consistency) _get(column_family, key, column, consistency) end |
#insert(column_family, key, hash, options = {}) ⇒ Object
This is the main method used to insert rows into cassandra. If the column_family that you are inserting into is a SuperColumnFamily then the hash passed in should be a nested hash, otherwise it should be a flat hash.
This method can also be called while in batch mode. If in batch mode then we queue up the mutations (an insert in this case) and pass them to cassandra in a single batch at the end of the block.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
hash - The columns or super columns to insert.
-
options - Valid options are:
-
:timestamp - Uses the current time if none specified.
-
:consistency - Uses the default write consistency if none specified.
-
:ttl - If specified this is the number of seconds after the insert that this value will be available.
-
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
# File 'lib/cassandra/cassandra.rb', line 445 def insert(column_family, key, hash, = {}) column_family, _, _, = extract_and_validate_params(column_family, key, [], WRITE_DEFAULTS) = [:timestamp] || Time.stamp mutation_map = if is_super(column_family) { key => { column_family => hash.collect{|k,v| _super_insert_mutation(column_family, k, v, , [:ttl]) } } } else { key => { column_family => hash.collect{|k,v| _standard_insert_mutation(column_family, k, v, , [:ttl])} } } end @batch ? @batch << [mutation_map, [:consistency]] : _mutate(mutation_map, [:consistency]) end |
#inspect ⇒ Object
134 135 136 137 138 |
# File 'lib/cassandra/cassandra.rb', line 134 def inspect "#<Cassandra:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{ Array(schema(false).cf_defs).map {|cfdef| ":#{cfdef.name} => #{cfdef.column_type}"}.join(', ') }}, @servers=#{servers.inspect}>" end |
#keyspaces ⇒ Object
Returns an array of available keyspaces.
155 156 157 158 159 |
# File 'lib/cassandra/cassandra.rb', line 155 def keyspaces return false if Cassandra.VERSION.to_f < 0.7 client.describe_keyspaces.to_a.collect {|ksdef| ksdef.name } end |
#login!(username, password) ⇒ Object
Issues a login attempt using the username and password specified.
-
username
-
password
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/cassandra/cassandra.rb', line 122 def login!(username, password) request = CassandraThrift::AuthenticationRequest.new request.credentials = {'username' => username, 'password' => password} ret = client.login(request) # To avoid a double login on the initial connect, we set # @auth_request after the first successful login. # @auth_request = request ret end |
#multi_count_columns(column_family, keys, *options) ⇒ Object
Multi-key version of Cassandra#count_columns. Please note that this queries the server for each key passed in.
Supports same parameters as Cassandra#count_columns.
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
FIXME: Not real multi; needs server support
540 541 542 |
# File 'lib/cassandra/cassandra.rb', line 540 def multi_count_columns(column_family, keys, *) OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *)] }._flatten_once] end |
#multi_get(column_family, keys, *columns_and_options) ⇒ Object
Multi-key version of Cassandra#get.
This method allows you to select multiple rows with a single query. If a key that is passed in doesn’t exist an empty hash will be returned.
Supports the same parameters as Cassandra#get.
-
column_family - The column_family that you are inserting into.
-
keys - An array of keys to select.
-
column - Either a single super_column or a single column.
-
sub_column - A single ub_columns to select.
-
options - Valid options are:
-
:count - The number of columns requested to be returned.
-
:start - The starting value for selecting a range of columns.
-
:finish - The final value for selecting a range of columns.
-
:reversed - If set to true the results will be returned in reverse order.
-
:consistency - Uses the default read consistency if none specified.
-
659 660 661 662 663 664 665 666 667 668 669 |
# File 'lib/cassandra/cassandra.rb', line 659 def multi_get(column_family, keys, *) column_family, column, sub_column, = extract_and_validate_params(column_family, keys, , READ_DEFAULTS) hash = _multiget(column_family, keys, column, sub_column, [:count], [:start], [:finish], [:reversed], [:consistency]) # Restore order ordered_hash = OrderedHash.new keys.each { |key| ordered_hash[key] = hash[key] || (OrderedHash.new if is_super(column_family) and !sub_column) } ordered_hash end |
#multi_get_columns(column_family, keys, *columns_and_options) ⇒ Object
Multi-key version of Cassandra#get_columns. Please note that this queries the server for each key passed in.
Supports same parameters as Cassandra#get_columns
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:consistency - Uses the default read consistency if none specified.
-
592 593 594 595 596 |
# File 'lib/cassandra/cassandra.rb', line 592 def multi_get_columns(column_family, keys, *) column_family, columns, sub_columns, = extract_and_validate_params(column_family, keys, , READ_DEFAULTS) _multi_get_columns(column_family, keys, columns, sub_columns, [:consistency]) end |
#partitioner ⇒ Object
Returns a string identifying which partitioner is in use by the current cluster. Typically, this will be RandomPartitioner, but it could be OrderPreservingPartioner as well.
Please note that this only works on version 0.7.0 and higher.
233 234 235 236 237 |
# File 'lib/cassandra/cassandra.rb', line 233 def partitioner return false if Cassandra.VERSION.to_f < 0.7 client.describe_partitioner() end |
#remove(column_family, key, *columns_and_options) ⇒ Object
This method is used to delete (actually marking them as deleted with a tombstone) rows, columns, or super columns depending on the parameters passed. If only a key is passed the entire row will be marked as deleted. If a column name is passed in that column will be deleted.
This method can also be used in batch mode. If in batch mode then we queue up the mutations (a deletion in this case)
-
column_family - The column_family that you are inserting into.
-
key - The row key to insert.
-
columns - Either a single super_column or a list of columns.
-
sub_columns - The list of sub_columns to select.
-
options - Valid options are:
-
:timestamp - Uses the current time if none specified.
-
:consistency - Uses the default write consistency if none specified.
-
TODO: we could change this function or add another that support multi-column removal (by list or predicate)
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/cassandra/cassandra.rb', line 486 def remove(column_family, key, *) column_family, column, sub_column, = extract_and_validate_params(column_family, key, , WRITE_DEFAULTS) if @batch mutation_map = { key => { column_family => [ _delete_mutation(column_family, column, sub_column, [:timestamp]|| Time.stamp) ] } } @batch << [mutation_map, [:consistency]] else # Let's continue using the 'remove' thrift method...not sure about the implications/performance of using the mutate instead # Otherwise we coul get use the mutation_map above, and do _mutate(mutation_map, options[:consistency]) args = {:column_family => column_family} columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column} column_path = CassandraThrift::ColumnPath.new(args.merge(columns)) _remove(key, column_path, [:timestamp] || Time.stamp, [:consistency]) end end |
#rename_column_family(old_name, new_name) ⇒ Object
Rename a column family. Returns the new schema id.
-
old_name - The current column_family name.
-
new_name - The desired column_family name.
305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/cassandra/cassandra.rb', line 305 def rename_column_family(old_name, new_name) return false if Cassandra.VERSION.to_f != 0.7 begin res = client.system_rename_column_family(old_name, new_name) rescue CassandraThrift::TimedOutException => te puts "Timed out: #{te.inspect}" end @schema = nil res end |
#rename_keyspace(old_name, new_name) ⇒ Object
Renames keyspace.
-
old_name - Current keyspace name.
-
new_name - Desired keyspace name.
Returns the new schema id
378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/cassandra/cassandra.rb', line 378 def rename_keyspace(old_name, new_name) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_rename_keyspace(old_name, new_name) rescue CassandraThrift::TimedOutException => toe puts "Timed out: #{toe.inspect}" rescue Thrift::TransportException => te puts "Timed out: #{te.inspect}" end keyspace = new_name if old_name.eql?(@keyspace) @keyspaces = nil res end |
#requires_reloading? ⇒ Boolean
1146 1147 1148 |
# File 'lib/cassandra/cassandra.rb', line 1146 def requires_reloading? true end |
#ring ⇒ Object
Returns an array of CassandraThrift::TokenRange objects indicating which servers make up the current ring. What their start and end tokens are, and their list of endpoints.
Please note that this only works on version 0.7.0 and higher.
221 222 223 224 225 |
# File 'lib/cassandra/cassandra.rb', line 221 def ring return false if Cassandra.VERSION.to_f < 0.7 client.describe_ring(@keyspace) end |
#run_callbacks(method) ⇒ Object
method required by ActiveRecord::ConnectionAdapters::ConnectionPool
1136 1137 |
# File 'lib/cassandra/cassandra.rb', line 1136 def run_callbacks(method) end |
#schema_agreement? ⇒ Boolean
This returns true if all servers are in agreement on the schema.
Please note that this only works on version 0.7.0 and higher.
189 190 191 192 193 |
# File 'lib/cassandra/cassandra.rb', line 189 def schema_agreement? return false if Cassandra.VERSION.to_f < 0.7 client.describe_schema_versions().length == 1 end |
#update_column_family(cf_def) ⇒ Object
Update the column family based on the passed in definition.
320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/cassandra/cassandra.rb', line 320 def update_column_family(cf_def) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_update_column_family(cf_def) rescue CassandraThrift::TimedOutException => te puts "Timed out: #{te.inspect}" end @schema = nil res end |
#update_keyspace(ks_def) ⇒ Object
Update the keyspace using the passed in keyspace definition.
396 397 398 399 400 401 402 403 404 405 406 407 408 |
# File 'lib/cassandra/cassandra.rb', line 396 def update_keyspace(ks_def) return false if Cassandra.VERSION.to_f < 0.7 begin res = client.system_update_keyspace(ks_def) rescue CassandraThrift::TimedOutException => toe puts "Timed out: #{toe.inspect}" rescue Thrift::TransportException => te puts "Timed out: #{te.inspect}" end @keyspaces = nil res end |
#verify! ⇒ Object
1139 1140 |
# File 'lib/cassandra/cassandra.rb', line 1139 def verify! end |