Class: Cassandra

Inherits:
Object
  • Object
show all
Includes:
Columns, Protocol
Defined in:
lib/cassandra/cassandra.rb,
lib/cassandra/long.rb,
lib/cassandra/uuid.rb,
lib/cassandra/columns.rb,
lib/cassandra/protocol.rb,
lib/cassandra/constants.rb,
lib/cassandra/comparable.rb,
lib/cassandra/ordered_hash.rb

Overview

Create a new Cassandra client instance. Accepts a keyspace name, and optional host and port.

client = Cassandra.new('twitter', '127.0.0.1', 9160)

You can then make calls to the server via the client instance.

client.insert(:UserRelationships, "5", {"user_timeline" => {UUID.new => "1"}})
client.get(:UserRelationships, "5", "user_timeline")

For read methods, valid option parameters are:

:count

How many results to return. Defaults to 100.

:start

Column name token at which to start iterating, inclusive. Defaults to nil, which means the first column in the collation order.

:finish

Column name token at which to stop iterating, inclusive. Defaults to nil, which means no boundary.

:reversed

Swap the direction of the collation order.

:consistency

The consistency level of the request. Defaults to Cassandra::Consistency::ONE (one node must respond). Other valid options are Cassandra::Consistency::ZERO, Cassandra::Consistency::QUORUM, and Cassandra::Consistency::ALL.

Note that some read options have no relevance in some contexts.

For write methods, valid option parameters are:

:timestamp

The transaction timestamp. Defaults to the current time in milliseconds. This is used for conflict resolution by the server; you normally never need to change it.

:consistency

See above.

Defined Under Namespace

Modules: Columns, Consistency, Constants, Protocol Classes: AccessError, Comparable, Long, OrderedHash, UUID

Constant Summary collapse

MAX_INT =
2**31 - 1
WRITE_DEFAULTS =
{
  :count => MAX_INT,
  :timestamp => nil,
  :consistency => Consistency::ONE
}.freeze
READ_DEFAULTS =
{
  :count => 100,
  :start => nil,
  :finish => nil,
  :reversed => false,
  :consistency => Consistency::ONE
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(keyspace, host = '127.0.0.1', port = 9160, buffer = true) ⇒ Cassandra

Instantiate a new Cassandra and open the connection.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/cassandra/cassandra.rb', line 59

def initialize(keyspace, host = '127.0.0.1', port = 9160, buffer = true)
  @is_super = {}
  @column_name_class = {}
  @sub_column_name_class = {}

  @keyspace = keyspace
  @host = host
  @port = port

  transport = Thrift::BufferedTransport.new(Thrift::Socket.new(@host, @port))
  transport.open
  
  @client = CassandraThrift::Cassandra::SafeClient.new(
    CassandraThrift::Cassandra::Client.new(Thrift::BinaryProtocol.new(transport)),
    transport,
    !buffer)

  keyspaces = @client.get_string_list_property("keyspaces")
  unless keyspaces.include?(@keyspace)
    raise AccessError, "Keyspace #{@keyspace.inspect} not found. Available: #{keyspaces.inspect}"
  end

  @schema = @client.describe_keyspace(@keyspace)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def client
  @client
end

#hostObject (readonly)

Returns the value of attribute host.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def host
  @host
end

#keyspaceObject (readonly)

Returns the value of attribute keyspace.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def keyspace
  @keyspace
end

#portObject (readonly)

Returns the value of attribute port.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def port
  @port
end

#schemaObject (readonly)

Returns the value of attribute schema.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def schema
  @schema
end

#serializerObject (readonly)

Returns the value of attribute serializer.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def serializer
  @serializer
end

#transportObject (readonly)

Returns the value of attribute transport.



56
57
58
# File 'lib/cassandra/cassandra.rb', line 56

def transport
  @transport
end

Instance Method Details

#batch(options = {}) ⇒ Object

Open a batch operation and yield. Inserts and deletes will be queued until the block closes, and then sent atomically to the server. Supports the :consistency option, which overrides the consistency set in the individual commands.



228
229
230
231
232
233
234
235
236
237
# File 'lib/cassandra/cassandra.rb', line 228

def batch(options = {})
  _, _, _, options = 
    validate_params(@schema.keys.first, "", [options], WRITE_DEFAULTS)

  @batch = []
  yield
  compact_mutations!
  _mutate(@batch, options[:consistency])
  @batch = nil
end

#clear_column_family!(column_family, options = {}) ⇒ Object

Remove all rows in the column family you request. Supports options :consistency and :timestamp. FIXME May not currently delete all records without multiple calls. Waiting for ranged remove support in Cassandra.



132
133
134
# File 'lib/cassandra/cassandra.rb', line 132

def clear_column_family!(column_family, options = {})
  get_range(column_family).each { |key| remove(column_family, key, options) }
end

#clear_keyspace!(options = {}) ⇒ Object

Remove all rows in the keyspace. Supports options :consistency and :timestamp. FIXME May not currently delete all records without multiple calls. Waiting for ranged remove support in Cassandra.



140
141
142
# File 'lib/cassandra/cassandra.rb', line 140

def clear_keyspace!(options = {})
  @schema.keys.each { |column_family| clear_column_family!(column_family, options) }
end

#count_columns(column_family, key, *columns_and_options) ⇒ Object

Count the elements at the column_family:key: path you request. Supports the :consistency option.



148
149
150
151
152
# File 'lib/cassandra/cassandra.rb', line 148

def count_columns(column_family, key, *columns_and_options)
  column_family, super_column, _, options = 
    validate_params(column_family, key, columns_and_options, READ_DEFAULTS)      
  _count_columns(column_family, key, super_column, options[:consistency])
end

#count_range(column_family, options = {}) ⇒ Object

Count all rows in the column_family you request. Requires the table to be partitioned with OrderPreservingHash. Supports the :start, :finish, and :consistency options. FIXME will count only MAX_INT records



220
221
222
# File 'lib/cassandra/cassandra.rb', line 220

def count_range(column_family, options = {})
  get_range(column_family, options.merge(:count => MAX_INT)).size
end

#exists?(column_family, key, *columns_and_options) ⇒ Boolean

Return true if the column_family:key::[sub_column] path you request exists. Supports the :consistency option.

Returns:

  • (Boolean)


200
201
202
203
204
# File 'lib/cassandra/cassandra.rb', line 200

def exists?(column_family, key, *columns_and_options)
  column_family, column, sub_column, options = 
    validate_params(column_family, key, columns_and_options, READ_DEFAULTS)
  _multiget(column_family, [key], column, sub_column, 1, nil, nil, nil, options[:consistency])[key]
end

#get(column_family, key, *columns_and_options) ⇒ Object

Return a hash (actually, a Cassandra::OrderedHash) or a single value representing the element at the column_family:key::[sub_column] path you request. Supports options :count, :start, :finish, :reversed, and :consistency.



181
182
183
# File 'lib/cassandra/cassandra.rb', line 181

def get(column_family, key, *columns_and_options)
  multi_get(column_family, [key], *columns_and_options)[key]
end

#get_columns(column_family, key, *columns_and_options) ⇒ Object

Return a list of single values for the elements at the column_family:key:column:[sub_columns] path you request. Supports the :consistency option.



164
165
166
167
168
# File 'lib/cassandra/cassandra.rb', line 164

def get_columns(column_family, key, *columns_and_options)
  column_family, columns, sub_columns, options = 
    validate_params(column_family, key, columns_and_options, READ_DEFAULTS)      
  _get_columns(column_family, key, columns, sub_columns, options[:consistency])
end

#get_range(column_family, options = {}) ⇒ Object

Return a list of keys in the column_family you request. Requires the table to be partitioned with OrderPreservingHash. Supports the :count, :start, :finish, and :consistency options.



210
211
212
213
214
# File 'lib/cassandra/cassandra.rb', line 210

def get_range(column_family, options = {})
  column_family, _, _, options = 
    validate_params(column_family, "", [options], READ_DEFAULTS)
  _get_range(column_family, options[:start].to_s, options[:finish].to_s, options[:count], options[:consistency])
end

#insert(column_family, key, hash, options = {}) ⇒ Object

Insert a row for a key. Pass a flat hash for a regular column family, and a nested hash for a super column family. Supports the :consistency and :timestamp options.



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/cassandra/cassandra.rb', line 95

def insert(column_family, key, hash, options = {})
  column_family, _, _, options = 
    validate_params(column_family, key, [options], WRITE_DEFAULTS)

  args = [column_family, hash, options[:timestamp] || Time.stamp]
  columns = is_super(column_family) ? hash_to_super_columns(*args) : hash_to_columns(*args)
  mutation = CassandraThrift::BatchMutation.new(
    :key => key,
    :cfmap => {column_family => columns},
    :column_paths => [])

  @batch ? @batch << mutation : _mutate([mutation], options[:consistency])
end

#inspectObject



84
85
86
87
88
# File 'lib/cassandra/cassandra.rb', line 84

def inspect
  "#<Cassandra:#{object_id}, @keyspace=#{keyspace.inspect}, @schema={#{
    schema.map {|name, hash| ":#{name} => #{hash['type'].inspect}"}.join(', ')
  }}, @host=#{host.inspect}, @port=#{port}>"
end

#multi_count_columns(column_family, keys, *options) ⇒ Object

Multi-key version of Cassandra#count_columns. Supports options :count, :start, :finish, :reversed, and :consistency. FIXME Not real multi; needs server support



157
158
159
# File 'lib/cassandra/cassandra.rb', line 157

def multi_count_columns(column_family, keys, *options)
  OrderedHash[*keys.map { |key| [key, count_columns(column_family, key, *options)] }._flatten_once]
end

#multi_get(column_family, keys, *columns_and_options) ⇒ Object

Multi-key version of Cassandra#get. Supports options :count, :start, :finish, :reversed, and :consistency.



187
188
189
190
191
192
193
194
195
196
# File 'lib/cassandra/cassandra.rb', line 187

def multi_get(column_family, keys, *columns_and_options)
  column_family, column, sub_column, options = 
    validate_params(column_family, keys, columns_and_options, READ_DEFAULTS)

  hash = _multiget(column_family, keys, column, sub_column, options[:count], options[:start], options[:finish], options[:reversed], options[:consistency])
  # Restore order
  ordered_hash = OrderedHash.new
  keys.each { |key| ordered_hash[key] = hash[key] || (OrderedHash.new if is_super(column_family) and !sub_column) }
  ordered_hash
end

#multi_get_columns(column_family, keys, *options) ⇒ Object

Multi-key version of Cassandra#get_columns. Supports the :consistency option. FIXME Not real multi; needs to use a Column predicate



173
174
175
# File 'lib/cassandra/cassandra.rb', line 173

def multi_get_columns(column_family, keys, *options)
  OrderedHash[*keys.map { |key| [key, get_columns(column_family, key, *options)] }._flatten_once]
end

#remove(column_family, key, *columns_and_options) ⇒ Object

_mutate the element at the column_family:key::[sub_column] path you request. Supports the :consistency and :timestamp options.



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/cassandra/cassandra.rb', line 114

def remove(column_family, key, *columns_and_options)
  column_family, column, sub_column, options = 
    validate_params(column_family, key, columns_and_options, WRITE_DEFAULTS)

  args = {:column_family => column_family, :timestamp => options[:timestamp] || Time.stamp}
  columns = is_super(column_family) ? {:super_column => column, :column => sub_column} : {:column => column}
  mutation = CassandraThrift::BatchMutation.new(
    :key => key,
    :cfmap => {},
    :column_paths => [CassandraThrift::ColumnPath.new(args.merge(columns))])

  @batch ? @batch << mutation : _mutate([mutation], options[:consistency])
end