Class: Cassandra

Inherits:
Object
  • Object
show all
Includes:
Columns, Helpers, Protocol
Defined in:
lib/cassandra/cassandra.rb,
lib/cassandra.rb,
lib/cassandra/0.6.rb,
lib/cassandra/0.7.rb,
lib/cassandra/0.8.rb,
lib/cassandra/1.0.rb,
lib/cassandra/1.1.rb,
lib/cassandra/long.rb,
lib/cassandra/batch.rb,
lib/cassandra/columns.rb,
lib/cassandra/helpers.rb,
lib/cassandra/keyspace.rb,
lib/cassandra/protocol.rb,
lib/cassandra/composite.rb,
lib/cassandra/constants.rb,
lib/cassandra/comparable.rb,
lib/cassandra/0.6/columns.rb,
lib/cassandra/0.7/columns.rb,
lib/cassandra/0.8/columns.rb,
lib/cassandra/0.6/protocol.rb,
lib/cassandra/0.7/protocol.rb,
lib/cassandra/0.8/protocol.rb,
lib/cassandra/ordered_hash.rb,
lib/cassandra/0.6/cassandra.rb,
lib/cassandra/0.7/cassandra.rb,
lib/cassandra/0.8/cassandra.rb,
lib/cassandra/column_family.rb,
lib/cassandra/dynamic_composite.rb,
lib/cassandra/mock.rb

Overview

OrderedHash is namespaced to prevent conflicts with other implementations

Defined Under Namespace

Modules: Columns, Consistency, Constants, Helpers, Protocol Classes: AccessError, Batch, ColumnFamily, Comparable, Composite, DynamicComposite, Keyspace, Long, Mock, OrderedHash, OrderedHashInt

Constant Summary collapse

WRITE_DEFAULTS =
{
  :count => 1000,
  :timestamp => nil,
  :consistency => Consistency::ONE,
  :ttl => nil
}
READ_DEFAULTS =
{
  :count => 100,
  :start => nil,
  :finish => nil,
  :reversed => false,
  :consistency => Consistency::ONE
}
THRIFT_DEFAULTS =
{
  :transport_wrapper    => Thrift::FramedTransport,
  :thrift_client_class  => ThriftClient
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Helpers

#extract_and_validate_params, #s_map

Methods included from Columns

#_standard_counter_mutation, #_super_counter_mutation

Constructor Details

#initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {}) ⇒ Cassandra

Create a new Cassandra instance and open the connection.



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/cassandra/cassandra.rb', line 75

def initialize(keyspace, servers = "127.0.0.1:9160", thrift_client_options = {})
  @is_super = {}
  @column_name_class = {}
  @sub_column_name_class = {}
  @column_name_maker = {}
  @sub_column_name_maker = {}
  @auto_discover_nodes = true
  thrift_client_options[:transport_wrapper] ||= Cassandra.DEFAULT_TRANSPORT_WRAPPER
  @thrift_client_options = THRIFT_DEFAULTS.merge(thrift_client_options)
  @thrift_client_class = @thrift_client_options[:thrift_client_class]
  @keyspace = keyspace
  @servers = Array(servers)
end

Instance Attribute Details

#auth_requestObject (readonly)

Returns the value of attribute auth_request.



68
69
70
# File 'lib/cassandra/cassandra.rb', line 68

def auth_request
  @auth_request
end

#keyspaceObject

Returns the value of attribute keyspace.



68
69
70
# File 'lib/cassandra/cassandra.rb', line 68

def keyspace
  @keyspace
end

#serversObject (readonly)

Returns the value of attribute servers.



68
69
70
# File 'lib/cassandra/cassandra.rb', line 68

def servers
  @servers
end

#thrift_client_classObject (readonly)

Returns the value of attribute thrift_client_class.



68
69
70
# File 'lib/cassandra/cassandra.rb', line 68

def thrift_client_class
  @thrift_client_class
end

#thrift_client_optionsObject (readonly)

Returns the value of attribute thrift_client_options.



68
69
70
# File 'lib/cassandra/cassandra.rb', line 68

def thrift_client_options
  @thrift_client_options
end

Class Method Details

.DEFAULT_TRANSPORT_WRAPPERObject



70
71
72
# File 'lib/cassandra/cassandra.rb', line 70

def self.DEFAULT_TRANSPORT_WRAPPER
  Thrift::FramedTransport
end

.VERSIONObject



2
3
4
# File 'lib/cassandra/0.6.rb', line 2

def self.VERSION
  "0.6"
end

Instance Method Details

#add(column_family, key, value, *columns_and_options) ⇒ Object

Add a value to the counter in cf:key:super column:column



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/cassandra/0.8/cassandra.rb', line 6

def add(column_family, key, value, *columns_and_options)
  column_family, column, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS)

  mutation_map = if is_super(column_family)
    {
      key => {
        column_family => [_super_counter_mutation(column_family, column, sub_column, value)]
      }
    }
  else
    {
      key => {
        column_family => [_standard_counter_mutation(column_family, column, value)]
      }
    }
  end

  @batch ? @batch << [mutation_map, options[:consistency]] : _mutate(mutation_map, options[:consistency])
end

#add_column_family(cf_def) ⇒ Object

Creates a new column family from the passed in Cassandra::ColumnFamily instance, and returns the schema id.



269
270
271
272
273
274
275
276
277
278
279
# File 'lib/cassandra/cassandra.rb', line 269

def add_column_family(cf_def)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_add_column_family(cf_def)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#add_keyspace(ks_def) ⇒ Object

Add keyspace using the passed in keyspace definition.

Returns the new schema id.



336
337
338
339
340
341
342
343
344
345
346
347
348
# File 'lib/cassandra/cassandra.rb', line 336

def add_keyspace(ks_def)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_add_keyspace(ks_def)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  @keyspaces = nil
  res
end

#batch(options = {}) ⇒ Object

Open a batch operation and yield self. Inserts and deletes will be queued until the block closes, and then sent atomically to the server. Supports the :consistency option, which overrides the consistency set in the individual commands.



852
853
854
855
856
857
858
859
860
861
862
# File 'lib/cassandra/cassandra.rb', line 852

def batch(options = {})
  @batch = Cassandra::Batch.new(self, options)

  _, _, _, options =
    extract_and_validate_params(schema.cf_defs.first.name, "", [options], WRITE_DEFAULTS)

  yield(self)
  flush_batch(options)
ensure
  @batch = nil
end

#clear_keyspace!(options = {}) ⇒ Object

Remove all rows in the keyspace. Supports options :consistency and :timestamp. FIXME May not currently delete all records without multiple calls. Waiting for ranged remove support in Cassandra.



259
260
261
262
263
# File 'lib/cassandra/cassandra.rb', line 259

def clear_keyspace!
  return false if Cassandra.VERSION.to_f < 0.7

  schema.cf_defs.each { |cfdef| truncate!(cfdef.name) }
end

#cluster_nameObject

Returns the string name specified for the cluster.

Please note that this only works on version 0.7.0 and higher.



208
209
210
211
212
# File 'lib/cassandra/cassandra.rb', line 208

def cluster_name
  return false if Cassandra.VERSION.to_f < 0.7

  @cluster_name ||= client.describe_cluster_name()
end

#column_familiesObject

Return a hash of column_family definitions indexed by their names



163
164
165
166
167
# File 'lib/cassandra/cassandra.rb', line 163

def column_families
  return false if Cassandra.VERSION.to_f < 0.7

  schema.cf_defs.inject(Hash.new){|memo, cf_def| memo[cf_def.name] = cf_def; memo;}
end

#count_columns(column_family, key, *columns_and_options) ⇒ Object

Count the columns for the provided parameters.

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • columns - Either a single super_column or a list of columns.

  • sub_columns - The list of sub_columns to select.

  • options - Valid options are:

    • :start - The column name to start from.

    • :stop - The column name to stop at.

    • :count - The maximum count of columns to return. (By default cassandra will count up to 100 columns)

    • :consistency - Uses the default read consistency if none specified.



523
524
525
526
527
# File 'lib/cassandra/cassandra.rb', line 523

def count_columns(column_family, key, *columns_and_options)
  column_family, super_column, _, options =
    extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS)
  _count_columns(column_family, key, super_column, options[:start], options[:stop], options[:count], options[:consistency])
end

#count_range(column_family, options = {}) ⇒ Object

Count all rows in the column_family you request.

This method just calls Cassandra#get_range_keys and returns the number of records returned.

See Cassandra#get_range for options.



800
801
802
# File 'lib/cassandra/cassandra.rb', line 800

def count_range(column_family, options = {})
  get_range_keys(column_family, options).length
end

#create_index(keyspace, column_family, column_name, validation_class) ⇒ Object

Create secondary index.

  • keyspace

  • column_family

  • column_name

  • validation_class



889
890
891
892
893
894
895
896
897
898
899
900
901
902
# File 'lib/cassandra/cassandra.rb', line 889

def create_index(keyspace, column_family, column_name, validation_class)
  return false if Cassandra.VERSION.to_f < 0.7

  cf_def = client.describe_keyspace(keyspace).cf_defs.find{|x| x.name == column_family}
  if !cf_def.nil? and !cf_def..find{|x| x.name == column_name}
    c_def  = CassandraThrift::ColumnDef.new do |cd|
      cd.name             = column_name
      cd.validation_class = "org.apache.cassandra.db.marshal."+validation_class
      cd.index_type       = CassandraThrift::IndexType::KEYS
    end
    cf_def..push(c_def)
    update_column_family(cf_def)
  end
end

#create_index_clause(index_expressions, start = "", count = 100) ⇒ Object Also known as: create_idx_clause

This method takes an array if CassandraThrift::IndexExpression objects and creates a CassandraThrift::IndexClause for use in the Cassandra#get_index_slices

  • index_expressions - Array of CassandraThrift::IndexExpressions.

  • start - The starting row key.

  • count - The count of items to be returned



959
960
961
962
963
964
965
966
# File 'lib/cassandra/cassandra.rb', line 959

def create_index_clause(index_expressions, start = "", count = 100)
  return false if Cassandra.VERSION.to_f < 0.7

  CassandraThrift::IndexClause.new(
    :start_key    => start,
    :expressions  => index_expressions,
    :count        => count)
end

#create_index_expression(column_name, value, comparison) ⇒ Object Also known as: create_idx_expr

This method is mostly used internally by get_index_slices to create a CassandraThrift::IndexExpression for the given options.

  • column_name - Column to be compared

  • value - Value to compare against

  • comparison - Type of comparison to do.



929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
# File 'lib/cassandra/cassandra.rb', line 929

def create_index_expression(column_name, value, comparison)
  return false if Cassandra.VERSION.to_f < 0.7

  CassandraThrift::IndexExpression.new(
    :column_name => column_name,
    :value => value,
    :op => (case comparison
              when nil, "EQ", "eq", "=="
                CassandraThrift::IndexOperator::EQ
              when "GTE", "gte", ">="
                CassandraThrift::IndexOperator::GTE
              when "GT", "gt", ">"
                CassandraThrift::IndexOperator::GT
              when "LTE", "lte", "<="
                CassandraThrift::IndexOperator::LTE
              when "LT", "lt", "<"
                CassandraThrift::IndexOperator::LT
            end ))
end

#default_read_consistency=(value) ⇒ Object

The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value. Use this if you do not want to specify a read consistency for each query.



422
423
424
# File 'lib/cassandra/cassandra.rb', line 422

def default_read_consistency=(value)
  READ_DEFAULTS[:consistency] = value
end

#default_write_consistency=(value) ⇒ Object

The initial default consistency is set to ONE, but you can use this method to override the normal default with your specified value. Use this if you do not want to specify a write consistency for each insert statement.



413
414
415
# File 'lib/cassandra/cassandra.rb', line 413

def default_write_consistency=(value)
  WRITE_DEFAULTS[:consistency] = value
end

#disable_node_auto_discovery!Object

This is primarily helpful when the cassandra cluster is communicating internally on a different ip address than what you are using to connect. A prime example of this would be when using EC2 to host a cluster. Typically, the cluster would be communicating over the local ip addresses issued by Amazon, but any clients connecting from outside EC2 would need to use the public ip.



101
102
103
# File 'lib/cassandra/cassandra.rb', line 101

def disable_node_auto_discovery!
  @auto_discover_nodes = false
end

#disconnect!Object

Disconnect the current client connection.



108
109
110
111
112
113
# File 'lib/cassandra/cassandra.rb', line 108

def disconnect!
  if @client
    @client.disconnect!
    @client = nil
  end
end

#drop_column_family(column_family) ⇒ Object

Delete the specified column family. Return the new schema id.

  • column_family - The column_family name to drop.



286
287
288
289
290
291
292
293
294
295
296
# File 'lib/cassandra/cassandra.rb', line 286

def drop_column_family(column_family)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_drop_column_family(column_family)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#drop_index(keyspace, column_family, column_name) ⇒ Object

Delete secondary index.

  • keyspace

  • column_family

  • column_name



911
912
913
914
915
916
917
918
919
# File 'lib/cassandra/cassandra.rb', line 911

def drop_index(keyspace, column_family, column_name)
  return false if Cassandra.VERSION.to_f < 0.7

  cf_def = client.describe_keyspace(keyspace).cf_defs.find{|x| x.name == column_family}
  if !cf_def.nil? and cf_def..find{|x| x.name == column_name}
    cf_def..delete_if{|x| x.name == column_name}
    update_column_family(cf_def)
  end
end

#drop_keyspace(keyspace = @keyspace) ⇒ Object

Deletes keyspace using the passed in keyspace name.

Returns the new schema id.



355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/cassandra/cassandra.rb', line 355

def drop_keyspace(keyspace=@keyspace)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_drop_keyspace(keyspace)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  keyspace = "system" if keyspace.eql?(@keyspace)
  @keyspaces = nil
  res
end

#each(column_family, options = {}) ⇒ Object

Iterate through each row in the given column family

This method just calls Cassandra#get_range and yields the key and columns.

See Cassandra#get_range for options.



838
839
840
841
842
# File 'lib/cassandra/cassandra.rb', line 838

def each(column_family, options = {})
  get_range_batch(column_family, options) do |key, columns|
    yield key, columns
  end
end

#each_key(column_family, options = {}) ⇒ Object

Iterate through each key within the given parameters. This function can be used to iterate over each key in the given column family.

This method just calls Cassandra#get_range and yields each row key.

See Cassandra#get_range for options.



824
825
826
827
828
# File 'lib/cassandra/cassandra.rb', line 824

def each_key(column_family, options = {})
  get_range_batch(column_family, options) do |key, columns|
    yield key
  end
end

#exists?(column_family, key, *columns_and_options) ⇒ Boolean

Return true if the column_family:key::[sub_column] path you request exists.

If passed in only a row key it will query for any columns (limiting to 1) for that row key. If a column is passed in it will query for that specific column/super column.

This method will return true or false.

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • columns - Either a single super_column or a list of columns.

  • sub_columns - The list of sub_columns to select.

  • options - Valid options are:

    • :consistency - Uses the default read consistency if none specified.

Returns:

  • (Boolean)


652
653
654
655
656
657
658
659
660
661
662
# File 'lib/cassandra/cassandra.rb', line 652

def exists?(column_family, key, *columns_and_options)
  column_family, column, sub_column, options =
    extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS)
  result = if column
             _multiget(column_family, [key], column, sub_column, 1, '', '', false, options[:consistency])[key]
           else
             _multiget(column_family, [key], nil, nil, 1, '', '', false, options[:consistency])[key]
           end

  ![{}, nil].include?(result)
end

#flush_batch(options = {}) ⇒ Object

Send the batch queue to the server



867
868
869
870
871
872
873
874
875
876
877
878
879
# File 'lib/cassandra/cassandra.rb', line 867

def flush_batch(options={})
  compacted_map,seen_clevels = compact_mutations!

  clevel = if options[:consistency] != nil # Override any clevel from individual mutations if
               options[:consistency]
             elsif seen_clevels.length > 1 # Cannot choose which CLevel to use if there are several ones
               raise "Multiple consistency levels used in the batch, and no override...cannot pick one"
             else # if no consistency override has been provided but all the clevels in the batch are the same: use that one
               seen_clevels.first
             end

  _mutate(compacted_map,clevel)
end

#get(column_family, key, *columns_and_options) ⇒ Object

Return a hash (actually, a Cassandra::OrderedHash) or a single value representing the element at the column_family:key::[sub_column] path you request.

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • column - Either a single super_column or single column.

  • sub_column - A single sub_column to select.

  • options - Valid options are:

    • :count - The number of columns requested to be returned.

    • :start - The starting value for selecting a range of columns.

    • :finish - The final value for selecting a range of columns.

    • :reversed - If set to true the results will be returned in

      reverse order.
      
    • :consistency - Uses the default read consistency if none specified.



599
600
601
# File 'lib/cassandra/cassandra.rb', line 599

def get(column_family, key, *columns_and_options)
  multi_get(column_family, [key], *columns_and_options)[key]
end

#get_columns(column_family, key, *columns_and_options) ⇒ Object

Return a hash of column value pairs for the path you request.

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • columns - Either a single super_column or a list of columns.

  • sub_columns - The list of sub_columns to select.

  • options - Valid options are:

    • :consistency - Uses the default read consistency if none specified.



557
558
559
560
561
# File 'lib/cassandra/cassandra.rb', line 557

def get_columns(column_family, key, *columns_and_options)
  column_family, columns, sub_columns, options =
    extract_and_validate_params(column_family, key, columns_and_options, READ_DEFAULTS)
  _get_columns(column_family, key, columns, sub_columns, options[:consistency])
end

#get_indexed_slices(column_family, index_clause, *columns_and_options) ⇒ Object

This method is used to query a secondary index with a set of provided search parameters.

Please note that you can either specify a CassandraThrift::IndexClause or an array of hashes with the format as below.

  • column_family - The Column Family this operation will be run on.

  • index_clause - This can either be a CassandraThrift::IndexClause or an array of hashes with the following keys:

    • :column_name - Column to be compared

    • :value - Value to compare against

    • :comparison - Type of comparison to do.

  • options

    • :key_count - Set maximum number of rows to return. (Only works if CassandraThrift::IndexClause is not passed in.)

    • :start_key - Set starting row key for search. (Only works if CassandraThrift::IndexClause is not passed in.)

    • :consistency

TODO: Supercolumn support.



988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
# File 'lib/cassandra/cassandra.rb', line 988

def get_indexed_slices(column_family, index_clause, *columns_and_options)
  return false if Cassandra.VERSION.to_f < 0.7

  column_family, columns, _, options =
    extract_and_validate_params(column_family, [], columns_and_options,
    READ_DEFAULTS.merge(:key_count => 100, :start_key => nil, :key_start => nil))

  start_key = options[:start_key] || options[:key_start] || ""

  if index_clause.class != CassandraThrift::IndexClause
    index_expressions = index_clause.collect do |expression|
      create_index_expression(expression[:column_name], expression[:value], expression[:comparison])
    end

    index_clause = create_index_clause(index_expressions, start_key, options[:key_count])
  end

  key_slices = _get_indexed_slices(column_family, index_clause, columns, options[:count], options[:start],
    options[:finish], options[:reversed], options[:consistency])

  key_slices.inject(OrderedHash.new) {|h, key_slice| h[key_slice.key] = key_slice.columns; h }
end

#get_range(column_family, options = {}, &blk) ⇒ Object

Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.

This method is just a convenience wrapper around Cassandra#get_range_single and Cassandra#get_range_batch. If :key_size, :batch_size, or a block is passed in Cassandra#get_range_batch will be called. Otherwise Cassandra#get_range_single will be used.

The start_key and finish_key parameters are only useful for iterating of all records as is done in the Cassandra#each and Cassandra#each_key methods if you are using the RandomPartitioner.

If the table is partitioned with OrderPreservingPartitioner you may use the start_key and finish_key params to select all records with the same prefix value.

If a block is passed in we will yield the row key and columns for each record returned.

Please note that Cassandra returns a row for each row that has existed in the system since gc_grace_seconds. This is because deleted row keys are marked as deleted, but left in the system until the cluster has had resonable time to replicate the deletion. This function attempts to suppress deleted rows (actually any row returned without columns is suppressed).

Please note that when enabling the :reversed option, :start and :finish should be swapped (e.g. reversal happens before selecting the range).

  • column_family - The column_family that you are inserting into.

  • options - Valid options are:

    • :start_key - The starting value for selecting a range of keys (only useful with OPP).

    • :finish_key - The final value for selecting a range of keys (only useful with OPP).

    • :key_count - The total number of keys to return from the query. (see note regarding deleted records)

    • :batch_size - The maximum number of keys to return per query. If specified will loop until :key_count is obtained or all records have been returned.

    • :columns - A list of columns to return.

    • :count - The number of columns requested to be returned.

    • :start - The starting value for selecting a range of columns.

    • :finish - The final value for selecting a range of columns.

    • :reversed - If set to true the results will be returned in reverse order.

    • :consistency - Uses the default read consistency if none specified.



706
707
708
709
710
711
712
# File 'lib/cassandra/cassandra.rb', line 706

def get_range(column_family, options = {}, &blk)
  if block_given? || options[:key_count] || options[:batch_size]
    get_range_batch(column_family, options, &blk)
  else
    get_range_single(column_family, options, &blk)
  end
end

#get_range_batch(column_family, options = {}) ⇒ Object

Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.

If a block is passed in we will yield the row key and columns for each record returned and return a nil value instead of a Cassandra::OrderedHash.

See Cassandra#get_range for more details.



756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
# File 'lib/cassandra/cassandra.rb', line 756

def get_range_batch(column_family, options = {})
  batch_size    = options.delete(:batch_size) || 100
  count         = options.delete(:key_count)
  result        = (!block_given? && {}) || nil
  num_results   = 0

  options[:start_key] ||= ''
  last_key  = nil

  while count.nil? || count > num_results
    res = get_range_single(column_family, options.merge!(:start_key => last_key || options[:start_key],
                                                         :key_count => batch_size,
                                                         :return_empty_rows => true
                                                        ))
    break if res.keys.last == last_key

    res.each do |key, columns|
      next if last_key == key
      next if num_results == count

      unless columns == {}
        if block_given?
          yield key, columns
        else
          result[key] = columns
        end
        num_results += 1
      end

      last_key = key
    end
  end

  result
end

#get_range_keys(column_family, options = {}) ⇒ Object

Return an Array containing all of the keys within a given range.

This method just calls Cassandra#get_range and returns the row keys for the records returned.

See Cassandra#get_range for options.



812
813
814
# File 'lib/cassandra/cassandra.rb', line 812

def get_range_keys(column_family, options = {})
  get_range(column_family,options.merge!(:count => 1)).keys
end

#get_range_single(column_family, options = {}) ⇒ Object

Return an Cassandra::OrderedHash containing the columns specified for the given range of keys in the column_family you request.

See Cassandra#get_range for more details.



720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
# File 'lib/cassandra/cassandra.rb', line 720

def get_range_single(column_family, options = {})
  return_empty_rows = options.delete(:return_empty_rows) || false

  column_family, _, _, options =
    extract_and_validate_params(column_family, "", [options],
                                READ_DEFAULTS.merge(:start_key  => '',
                                                    :finish_key => '',
                                                    :key_count  => 100,
                                                    :columns    => nil,
                                                    :reversed   => false
                                                   )
                               )

  results = _get_range( column_family,
                        options[:start_key].to_s,
                        options[:finish_key].to_s,
                        options[:key_count],
                        options[:columns],
                        options[:start].to_s,
                        options[:finish].to_s,
                        options[:count],
                        options[:consistency],
                        options[:reversed] )

  multi_key_slices_to_hash(column_family, results, return_empty_rows)
end

#insert(column_family, key, hash, options = {}) ⇒ Object

This is the main method used to insert rows into cassandra. If the column_family that you are inserting into is a SuperColumnFamily then the hash passed in should be a nested hash, otherwise it should be a flat hash.

This method can also be called while in batch mode. If in batch mode then we queue up the mutations (an insert in this case) and pass them to cassandra in a single batch at the end of the block.

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • hash - The columns or super columns to insert.

  • options - Valid options are:

    • :timestamp - Uses the current time if none specified.

    • :consistency - Uses the default write consistency if none specified.

    • :ttl - If specified this is the number of seconds after the insert that this value will be available.



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/cassandra/cassandra.rb', line 444

def insert(column_family, key, hash, options = {})
  column_family, _, _, options = extract_and_validate_params(column_family, key, [options], WRITE_DEFAULTS)

  timestamp = options[:timestamp] || Time.stamp
  mutation_map = if is_super(column_family)
    {
      key => {
        column_family => hash.collect{|k,v| _super_insert_mutation(column_family, k, v, timestamp, options[:ttl]) }
      }
    }
  else
    {
      key => {
        column_family => hash.collect{|k,v| _standard_insert_mutation(column_family, k, v, timestamp, options[:ttl])}
      }
    }
  end

  @batch ? @batch << [mutation_map, options[:consistency]] : _mutate(mutation_map, options[:consistency])
end

#inspectObject



133
134
135
136
137
# File 'lib/cassandra/cassandra.rb', line 133

def inspect
  "#<Cassandra:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{
    Array(schema(false).cf_defs).map {|cfdef| ":#{cfdef.name} => #{cfdef.column_type}"}.join(', ')
  }}, @servers=#{servers.inspect}>"
end

#keyspacesObject

Returns an array of available keyspaces.



154
155
156
157
158
# File 'lib/cassandra/cassandra.rb', line 154

def keyspaces
  return false if Cassandra.VERSION.to_f < 0.7

  client.describe_keyspaces.to_a.collect {|ksdef| ksdef.name }
end

#login!(username, password) ⇒ Object

Issues a login attempt using the username and password specified.

  • username

  • password



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cassandra/cassandra.rb', line 121

def login!(username, password)
  request = CassandraThrift::AuthenticationRequest.new
  request.credentials = {'username' => username, 'password' => password}
  ret = client.(request)

  # To avoid a double login on the initial connect, we set
  # @auth_request after the first successful login.
  #
  @auth_request = request
  ret
end

#multi_count_columns(column_family, keys, *options) ⇒ Object

Multi-key version of Cassandra#count_columns. Please note that this queries the server for each key passed in.

Supports same parameters as Cassandra#count_columns.

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • columns - Either a single super_column or a list of columns.

  • sub_columns - The list of sub_columns to select.

  • options - Valid options are:

    • :consistency - Uses the default read consistency if none specified.

FIXME: Not real multi; needs server support



543
544
545
# File 'lib/cassandra/cassandra.rb', line 543

def multi_count_columns(column_family, keys, *options)
  OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *options)] }._flatten_once]
end

#multi_get(column_family, keys, *columns_and_options) ⇒ Object

Multi-key version of Cassandra#get.

This method allows you to select multiple rows with a single query. If a key that is passed in doesn’t exist an empty hash will be returned.

Supports the same parameters as Cassandra#get.

  • column_family - The column_family that you are inserting into.

  • keys - An array of keys to select.

  • column - Either a single super_column or a single column.

  • sub_column - A single ub_columns to select.

  • options - Valid options are:

    • :count - The number of columns requested to be returned.

    • :start - The starting value for selecting a range of columns.

    • :finish - The final value for selecting a range of columns.

    • :reversed - If set to true the results will be returned in reverse order.

    • :consistency - Uses the default read consistency if none specified.



623
624
625
626
627
628
629
630
631
632
633
# File 'lib/cassandra/cassandra.rb', line 623

def multi_get(column_family, keys, *columns_and_options)
  column_family, column, sub_column, options =
    extract_and_validate_params(column_family, keys, columns_and_options, READ_DEFAULTS)

  hash = _multiget(column_family, keys, column, sub_column, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency])

  # Restore order
  ordered_hash = OrderedHash.new
  keys.each { |key| ordered_hash[key] = hash[key] || (OrderedHash.new if is_super(column_family) and !sub_column) }
  ordered_hash
end

#multi_get_columns(column_family, keys, *columns_and_options) ⇒ Object

Multi-key version of Cassandra#get_columns. Please note that this queries the server for each key passed in.

Supports same parameters as Cassandra#get_columns

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • columns - Either a single super_column or a list of columns.

  • sub_columns - The list of sub_columns to select.

  • options - Valid options are:

    • :consistency - Uses the default read consistency if none specified.



576
577
578
579
580
# File 'lib/cassandra/cassandra.rb', line 576

def multi_get_columns(column_family, keys, *columns_and_options)
  column_family, columns, sub_columns, options =
    extract_and_validate_params(column_family, keys, columns_and_options, READ_DEFAULTS)
  _multi_get_columns(column_family, keys, columns, sub_columns, options[:consistency])
end

#partitionerObject

Returns a string identifying which partitioner is in use by the current cluster. Typically, this will be RandomPartitioner, but it could be OrderPreservingPartioner as well.

Please note that this only works on version 0.7.0 and higher.



232
233
234
235
236
# File 'lib/cassandra/cassandra.rb', line 232

def partitioner
  return false if Cassandra.VERSION.to_f < 0.7

  client.describe_partitioner()
end

#remove(column_family, key, *columns_and_options) ⇒ Object

This method is used to delete (actually marking them as deleted with a tombstone) rows, columns, or super columns depending on the parameters passed. If only a key is passed the entire row will be marked as deleted. If a column name is passed in that column will be deleted.

This method can also be used in batch mode. If in batch mode then we queue up the mutations (a deletion in this case)

  • column_family - The column_family that you are inserting into.

  • key - The row key to insert.

  • columns - Either a single super_column or a list of columns.

  • sub_columns - The list of sub_columns to select.

  • options - Valid options are:

    • :timestamp - Uses the current time if none specified.

    • :consistency - Uses the default write consistency if none specified.



483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# File 'lib/cassandra/cassandra.rb', line 483

def remove(column_family, key, *columns_and_options)
  column_family, columns, sub_column, options = extract_and_validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS)

  if columns.is_a? Array
    if sub_column
      raise ArgumentError, 'remove does not support sub_columns with array of columns'
    end
  else
    columns = [columns]
  end

  timestamp = options[:timestamp]|| Time.stamp

  mutation_map =
    {
      key => {
        column_family => columns.map {|column|
          _delete_mutation(column_family, column, sub_column, timestamp)
        }
      }
    }

  mutation = [mutation_map, options[:consistency]]

  @batch ? @batch << mutation : _mutate(*mutation)
end

#rename_column_family(old_name, new_name) ⇒ Object

Rename a column family. Returns the new schema id.

  • old_name - The current column_family name.

  • new_name - The desired column_family name.



304
305
306
307
308
309
310
311
312
313
314
# File 'lib/cassandra/cassandra.rb', line 304

def rename_column_family(old_name, new_name)
  return false if Cassandra.VERSION.to_f != 0.7

  begin
    res = client.system_rename_column_family(old_name, new_name)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#rename_keyspace(old_name, new_name) ⇒ Object

Renames keyspace.

  • old_name - Current keyspace name.

  • new_name - Desired keyspace name.

Returns the new schema id



377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/cassandra/cassandra.rb', line 377

def rename_keyspace(old_name, new_name)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_rename_keyspace(old_name, new_name)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  keyspace = new_name if old_name.eql?(@keyspace)
  @keyspaces = nil
  res
end

#ringObject

Returns an array of CassandraThrift::TokenRange objects indicating which servers make up the current ring. What their start and end tokens are, and their list of endpoints.

Please note that this only works on version 0.7.0 and higher.



220
221
222
223
224
# File 'lib/cassandra/cassandra.rb', line 220

def ring
  return false if Cassandra.VERSION.to_f < 0.7

  client.describe_ring(@keyspace)
end

#schema_agreement?Boolean

This returns true if all servers are in agreement on the schema.

Please note that this only works on version 0.7.0 and higher.

Returns:

  • (Boolean)


188
189
190
191
192
# File 'lib/cassandra/cassandra.rb', line 188

def schema_agreement?
  return false if Cassandra.VERSION.to_f < 0.7

  client.describe_schema_versions().length == 1
end

#update_column_family(cf_def) ⇒ Object

Update the column family based on the passed in definition.



319
320
321
322
323
324
325
326
327
328
329
# File 'lib/cassandra/cassandra.rb', line 319

def update_column_family(cf_def)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_update_column_family(cf_def)
  rescue CassandraThrift::TimedOutException => te
    puts "Timed out: #{te.inspect}"
  end
  @schema = nil
  res
end

#update_keyspace(ks_def) ⇒ Object

Update the keyspace using the passed in keyspace definition.



395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/cassandra/cassandra.rb', line 395

def update_keyspace(ks_def)
  return false if Cassandra.VERSION.to_f < 0.7

  begin
    res = client.system_update_keyspace(ks_def)
  rescue CassandraThrift::TimedOutException => toe
    puts "Timed out: #{toe.inspect}"
  rescue Thrift::TransportException => te
    puts "Timed out: #{te.inspect}"
  end
  @keyspaces = nil
  res
end

#versionObject

Lists the current cassandra.thrift version.

Please note that this only works on version 0.7.0 and higher.



198
199
200
201
202
# File 'lib/cassandra/cassandra.rb', line 198

def version
  return false if Cassandra.VERSION.to_f < 0.7

  client.describe_version()
end