Class: RightSupport::DB::CassandraModel

Inherits:
Object
  • Object
show all
Includes:
Log::Mixin
Defined in:
lib/right_support/db/cassandra_model.rb

Overview

Base class for a column family in a keyspace Used to access data persisted in Cassandra Provides wrappers for Cassandra client methods

Constant Summary collapse

LONG_OPERATION_LOG_PERIOD =

For long-running operations, generate period log output after this many columns to help observers figure out how much progress has been made

1_000_000
DEFAULT_TIMEOUT =

Default timeout for client connection to Cassandra server

20
DEFAULT_COUNT =

Default maximum number of columns to retrieve in one chunk

100
DEFAULT_ROW_COUNT =

Default maximum number of rows to retrieve in one chunk

100
METHODS_TO_LOG =
[:multi_get, :get, :get_indexed_slices, :get_columns, :insert, :remove, 'multi_get', 'get', 'get_indexed_slices', 'get_columns', 'insert', 'remove']
@@current_keyspace =
nil
@@connections =
{}

Constants included from Log::Mixin

Log::Mixin::Decorator, Log::Mixin::UNDELEGATED

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Log::Mixin

default_logger, default_logger=, included

Constructor Details

#initialize(key, attrs = {}) ⇒ CassandraModel

Create column family object

Parameters

key(String)

Primary key for object

attrs(Hash)

Attributes for object which form Cassandra row

with column name as key and column value as value


766
767
768
769
# File 'lib/right_support/db/cassandra_model.rb', line 766

def initialize(key, attrs = {})
  self.key        = key
  self.attributes = attrs
end

Class Attribute Details

.column_familyObject

Returns the value of attribute column_family.



162
163
164
# File 'lib/right_support/db/cassandra_model.rb', line 162

def column_family
  @column_family
end

.default_keyspaceObject (readonly)

Returns the value of attribute default_keyspace.



161
162
163
# File 'lib/right_support/db/cassandra_model.rb', line 161

def default_keyspace
  @default_keyspace
end

Instance Attribute Details

#attributesObject

self



758
759
760
# File 'lib/right_support/db/cassandra_model.rb', line 758

def attributes
  @attributes
end

#keyObject

self



758
759
760
# File 'lib/right_support/db/cassandra_model.rb', line 758

def key
  @key
end

Class Method Details

.all(k, opt = {}) ⇒ Object

Get row(s) for specified key(s) Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Object|nil)

Individual row, or nil if not found, or ordered hash of rows



327
328
329
# File 'lib/right_support/db/cassandra_model.rb', line 327

def all(k, opt = {})
  real_get(k, opt)
end

.batch(*args, &block) ⇒ Object

Open a batch operation and yield self Inserts and deletes are queued until the block closes, and then sent atomically to the server Supports :consistency option, which overrides that set in individual commands

Parameters

args(Array)

Batch options such as :consistency

Block

Required block making Cassandra requests

Returns

(Array)

Mutation map and consistency level

Raise

Exception

If block not specified



633
634
635
636
# File 'lib/right_support/db/cassandra_model.rb', line 633

def batch(*args, &block)
  raise "Block required!" unless block_given?
  do_op(:batch, *args, &block)
end

.calculate_random_partitioner_token(key) ⇒ Integer

Compute the token for a given row key, which can provide information on the progress of very large “each” operations, e.g. iterating over all rows of a column family.

Parameters:

  • key (String)

    byte-vector (binary String) representation of the row key

Returns:

  • (Integer)

    the 128-bit token for a given row key, as used by Cassandra’s RandomPartitioner



205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/right_support/db/cassandra_model.rb', line 205

def calculate_random_partitioner_token(key)
  number = Digest::MD5.hexdigest(key).to_i(16)

  if number >= (2**127)
    # perform two's complement, basically this takes the absolute value of the number as
    # if it were a 128-bit signed number. Equivalent to Java BigInteger.abs() operation.
    result = (number ^ (2**128)-1) + 1
  else
    # we're good
    result = number
  end

  result
end

.configObject



185
186
187
# File 'lib/right_support/db/cassandra_model.rb', line 185

def config
  @@config
end

.config=(value) ⇒ Object



196
197
198
# File 'lib/right_support/db/cassandra_model.rb', line 196

def config=(value)
  @@config = normalize_config(value) unless value.nil?
end

.connCassandra

Return a Cassandra client object connected to a server and authorized to a suitable keyspace. Create connection if does not already exist; use BinaryProtocolAccelerated if available.

This method determines the current keyspace based on the return value of self.keyspace which looks at the value of @@current_keyspace or @@default_keyspace to determine the keyspace it is operating under. If a connection already exists for the keyspace it will re-use it. If a connection does not exist, it will create a new persistent connection for that keyspace that can be re-used with future requests

Returns:

  • (Cassandra)

    a client object that can be used to send requests to the ring



311
312
313
314
315
# File 'lib/right_support/db/cassandra_model.rb', line 311

def conn()
  @@connections                ||= {}
  @@connections[self.keyspace] = get_connection(@@connections[self.keyspace])
  @@connections[self.keyspace]
end

.default_read_consistencyObject

This is required to be overwritten in order to set the read CL



171
172
173
# File 'lib/right_support/db/cassandra_model.rb', line 171

def default_read_consistency
  nil
end

.default_write_consistencyObject

This is required to be overwritten in order to set the write CL



176
177
178
# File 'lib/right_support/db/cassandra_model.rb', line 176

def default_write_consistency
  nil
end

.do_op(meth, *args, &block) ⇒ Object

Perform a Cassandra operation on the connection object. Rescue IOError by automatically reconnecting and retrying the operation.

Parameters

meth(Symbol)

Method to be executed

*args(Array)

Method arguments to forward to the Cassandra connection

Block

Block if any to be executed by method

Return

(Object)

Value returned by executed method



650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
# File 'lib/right_support/db/cassandra_model.rb', line 650

def do_op(meth, *args, &block)
  first_started_at ||= Time.now
  retries          ||= 0
  started_at       = Time.now

  # cassandra functionality
  result           = conn.send(meth, *args, &block)

  # log functionality
  do_op_log(first_started_at, started_at, retries, meth, args[0], args[1])

  return result
rescue IOError
  reconnect
  retries += 1
  retry
end

.do_op_log(first_started_at, started_at, retries, meth, cf, key) ⇒ Object



668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
# File 'lib/right_support/db/cassandra_model.rb', line 668

def do_op_log(first_started_at, started_at, retries, meth, cf, key)
  now          = Time.now
  attempt_time = now - started_at

  if METHODS_TO_LOG.include?(meth)
    key_count = key.is_a?(Array) ? key.size : 1

    log_string = sprintf("CassandraModel %s, cf=%s, keys=%d, time=%.1fms", meth, cf, key_count, attempt_time*1000)

    if retries && retries > 0
      total_time = now - first_started_at
      log_string += sprintf(", retries=%d, total_time=%.1fms", retries, total_time*1000)
    end

    logger.debug(log_string)
  end
end

.env_configObject



189
190
191
192
193
194
# File 'lib/right_support/db/cassandra_model.rb', line 189

def env_config
  env = ENV['RACK_ENV']
  raise MissingConfiguration, "CassandraModel config is missing a '#{ENV['RACK_ENV']}' section" \
      unless !@@config.nil? && @@config.keys.include?(env) && @@config[env]
  @@config[env]
end

.get(key, opt = {}) ⇒ Object

Get row for specified primary key and convert into object of given class Unless :count is specified, a maximum of 100 columns are retrieved

Parameters

key(String)

Primary key on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(CassandraModel|nil)

Instantiated object of given class, or nil if not found



341
342
343
344
345
346
347
# File 'lib/right_support/db/cassandra_model.rb', line 341

def get(key, opt = {})
  if (attrs = real_get(key, opt)).empty?
    nil
  else
    new(key, attrs)
  end
end

.get_all_indexed_slices(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency and :count used

Block

Optional block that is yielded each chunk as it is retrieved as an array like the normally returned result

Return

(OrderedHash)

Rows retrieved with each key, value is columns



399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/right_support/db/cassandra_model.rb', line 399

def get_all_indexed_slices(index, key, columns = nil, opt = {})
  rows  = Cassandra::OrderedHash.new
  start = ""
  count = opt.delete(:count) || DEFAULT_COUNT
  expr  = do_op(:create_idx_expr, index, key, "EQ")
  opt   = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  while true
    clause = do_op(:create_idx_clause, [expr], start, count)
    chunk  = self.conn.get_indexed_slices(column_family, clause, columns, opt)
    rows.merge!(chunk)
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
      # This must be the last chunk
      break
    end
  end
  rows
end

.get_columns(key, columns, opt = {}) ⇒ Object

Get specific columns in row with specified key

Parameters

key(String)

Primary key on which to match

columns(Array)

Names of columns to be retrieved

opt(Hash)

Request options such as :consistency

Return

(Array)

Values of selected columns in the order specified



588
589
590
# File 'lib/right_support/db/cassandra_model.rb', line 588

def get_columns(key, columns, opt = {})
  do_op(:get_columns, column_family, key, columns, sub_columns = nil, opt)
end

.get_connection(current = nil) ⇒ Object



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/right_support/db/cassandra_model.rb', line 286

def get_connection(current=nil)
  config                = env_config
  thrift_client_options = {
    :timeout             => RightSupport::DB::CassandraModel::DEFAULT_TIMEOUT,
    :server_retry_period => nil,
  }

  thrift_client_options.merge!({:protocol => Thrift::BinaryProtocolAccelerated}) if defined? Thrift::BinaryProtocolAccelerated

  current ||= Cassandra.new(keyspace, config["server"], thrift_client_options)
  current.disable_node_auto_discovery!
  current.default_write_consistency = self.default_write_consistency if self.default_write_consistency
  current.default_read_consistency = self.default_read_consistency if self.default_read_consistency
  current
end

.get_indexed(index, key, columns = nil, opt = {}) ⇒ Object

Get all rows for specified secondary key

Parameters

index(String)

Name of secondary index

key(String)

Index value that each selected row is required to match

columns(Array|nil)

Names of columns to be retrieved, defaults to all

opt(Hash)

Request options with only :consistency used

Return

(Array)

Rows retrieved with each member being an instantiated object of the

given class as value, but object only contains values for the columns retrieved;
array is always empty if a block is given


547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# File 'lib/right_support/db/cassandra_model.rb', line 547

def get_indexed(index, key, columns = nil, opt = {})
  rows  = []
  start = ""
  count = DEFAULT_COUNT
  expr  = do_op(:create_idx_expr, index, key, "EQ")
  opt   = opt[:consistency] ? {:consistency => opt[:consistency]} : {}
  loop do
    clause     = do_op(:create_idx_clause, [expr], start, count)
    chunk      = do_op(:get_indexed_slices, column_family, clause, columns, opt)
    chunk_rows = []
    chunk.each do |row_key, row_columns|
      if row_columns && row_key != start
        attrs = row_columns.inject({}) { |a, c| a[c.column.name] = c.column.value; a }
        chunk_rows << new(row_key, attrs)
      end
    end
    if block_given?
      yield chunk_rows
    else
      rows.concat(chunk_rows)
    end
    if chunk.size == count
      # Assume there are more chunks, use last key as start of next get
      start = chunk.keys.last
    else
      # This must be the last chunk
      break
    end
  end
  rows
end

.inherited(base) ⇒ Object

Deprecate usage of CassandraModel under Ruby < 1.9



181
182
183
# File 'lib/right_support/db/cassandra_model.rb', line 181

def inherited(base)
  raise UnsupportedRubyVersion, "Support only Ruby >= 1.9" unless RUBY_VERSION >= "1.9"
end

.insert(key, values, opt = {}) ⇒ Object

Insert a row for a key

Parameters

key(String)

Primary key for value

values(Hash)

Values to be stored

opt(Hash)

Request options such as :consistency

Return

(Array)

Mutation map and consistency level



601
602
603
# File 'lib/right_support/db/cassandra_model.rb', line 601

def insert(key, values, opt={})
  do_op(:insert, column_family, key, values, opt)
end

.keyspaceObject

Return current active keyspace.

Return

keyspace(String)

current_keyspace or default_keyspace



245
246
247
# File 'lib/right_support/db/cassandra_model.rb', line 245

def keyspace
  @@current_keyspace || @@default_keyspace
end

.keyspace=(kyspc) ⇒ Object

Sets the default keyspace

Parameters

keyspace(String)

Set the default keyspace



254
255
256
257
258
259
# File 'lib/right_support/db/cassandra_model.rb', line 254

def keyspace=(kyspc)
  env                = ENV['RACK_ENV'] || 'development'
  nspace             = namespace(env)
  @@default_keyspace = "#{kyspc}_#{env}"
  @@default_keyspace += "_#{nspace}" if nspace
end

.keyspacesObject

Return current keyspace names as Array of String (any keyspace that has been used this session).

Return

(Array)

keyspaces names



237
238
239
# File 'lib/right_support/db/cassandra_model.rb', line 237

def keyspaces
  @@connections.keys
end

.namespace(env) ⇒ Object

As part of constructing the keyspace name, determine if we have a trailing namespace to append. Applicable to rails_env test and integration, otherwise defaults to nil.



222
223
224
225
226
227
228
229
230
231
# File 'lib/right_support/db/cassandra_model.rb', line 222

def namespace(env)
  ( config[env] && config[env]['namespace'] ) || begin
    case env
    when 'test'
      'testns'
    when 'integration'
      File.read('/etc/namespace').strip if File.file?('/etc/namespace')
    end
  end
end

.real_get(k, opt = {}) ⇒ Object

Get raw row(s) for specified primary key(s) Unless :count is specified, a maximum of 100 columns are retrieved except in the case of an individual primary key request, in which case all columns are retrieved

Parameters

k(String|Array)

Individual primary key or list of keys on which to match

opt(Hash)

Request options including :consistency and for column level

control :count, :start, :finish, :reversed

Return

(Cassandra::OrderedHash)

Individual row or OrderedHash of rows



361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/right_support/db/cassandra_model.rb', line 361

def real_get(k, opt = {})
  if k.is_a?(Array)
    do_op(:multi_get, column_family, k, opt)
  elsif opt[:count]
    do_op(:get, column_family, k, opt)
  else
    opt         = opt.clone
    opt[:count] = DEFAULT_COUNT
    columns     = Cassandra::OrderedHash.new
    loop do
      chunk = do_op(:get, column_family, k, opt)
      columns.merge!(chunk)
      if chunk.size == opt[:count]
        # Assume there are more chunks, use last key as start of next get
        opt[:start] = chunk.keys.last
      else
        # This must be the last chunk
        break
      end
    end
    columns
  end
end

.reconnectObject

Reconnect to Cassandra server Use BinaryProtocolAccelerated if it available

Return

true

Always return true



691
692
693
694
695
# File 'lib/right_support/db/cassandra_model.rb', line 691

def reconnect
  return false if keyspace.nil?
  @@connections[keyspace] = get_connection
  true
end

.remove(*args) ⇒ Object

Delete row or columns of row

Parameters

args(Array)

Key, columns, options

Return

(Array)

Mutation map and consistency level



612
613
614
# File 'lib/right_support/db/cassandra_model.rb', line 612

def remove(*args)
  do_op(:remove, column_family, *args)
end

.ringObject

Cassandra ring

Return

(Array)

Members of ring



701
702
703
# File 'lib/right_support/db/cassandra_model.rb', line 701

def ring
  conn.ring
end

.stream_all_indexed_slices(index, key) {|row_key, columns| ... } ⇒ Integer

This method is an attempt to circumvent the Cassandra gem limitation of returning only 100 columns for wide rows, and also to help reliably iterate through a column family when the node is busy and experiencing many timeouts.

Internally, it uses Cassandra#get_indexed_slices to find rows that match your index constraint; for each matching row key, it iterates through all columns of that row, in chunks, using Cassandra#get_range. This approach is less efficient than grabbing some column values in the initial #get_indexed_slices, but it allows us to preserve the natural ordering of the columns we yield, and prevents us from yielding any column twice.

A row key may be yielded more than once as each “chunk” of columns from that row is read from the ring, but each column will be yielded exactly once.

Parameters:

  • index (String)

    column name

  • index (String)

    column value

Yields:

  • (row_key, columns)

    yields one or more times for every row that contains a matching index column, ultimately yielding EVERY column in that row

Yield Parameters:

  • row_key (String)

    the row key currently being processes

  • columns (Array)

    an array of Cassandra CassandraThrift::ColumnOrSuperColumn objects

Returns:

  • (Integer)

    total number of columns read



439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
# File 'lib/right_support/db/cassandra_model.rb', line 439

def stream_all_indexed_slices(index, key)
  expr      = do_op(:create_idx_expr, index, key, "EQ")
  start_row = ''

  last_report   = 0
  total_rows    = 0
  total_columns = 0

  # Loop over all CF rows, with batches of X
  while (start_row != nil)
    # Reset these to their initial values on every iteration thru the loop, in case
    # we backed off due to timeouts (see rescue clauses below)
    #
    # Since this method is made for wide rows, also ask for 10x the default column count.
    row_count    = DEFAULT_ROW_COUNT
    column_count = DEFAULT_COUNT * 10

    clause = do_op(:create_idx_clause, [expr], start_row, row_count)

    # Ask for a single column from each row, because we don't care about the column values
    # in this step; we just want the row keys that contain a matching index column.
    begin
      row_keys = self.conn.get_indexed_slices(column_family, clause, :count => 1).keys
    rescue Exception => e
      if retryable_read_timeout?(e)
        logger.error "CassandraModel#stream_all_indexed_slices retrying get_indexed_slices with fewer rows due to a %s: %s @ %s (cf='%s' start_row='%s' row_count=%d)" %
                       [e.class.name, e.message, e.backtrace.first, column_family, start_row, row_count]
        row_count /= 10 if row_count > 1
        retry
      else
        raise
      end
    end

    row_keys.each do |row_key|
      # We already processed this row the previous iteration; skip it
      next if row_key == start_row

      total_rows += 1
      start_column = ''

      while start_column != nil
        begin
          options = {
            :start_key       => row_key,
            :finish_key      => row_key,
            :start           => start_column,
            :count           => column_count,
            :slices_not_hash => true
          }

          columns = self.conn.get_range(column_family, options).first.columns

          if columns[0].column.name == start_column
            total_columns += columns.size - 1
            yield(row_key, columns[1..-1])
          else
            total_columns += columns.size
            yield(row_key, columns)
          end

          # Help Ops figure out where we are
          if (total_columns - last_report > LONG_OPERATION_LOG_PERIOD)
            logger.info "CassandraModel#stream_all_indexed_slices got RangeSlice total_rows=%d total_columns=%d (cf='%s' row='%s' start='%s' count=%d)" %
                          [total_rows, total_columns, column_family, row_key, start_column, column_count]
            last_report = total_columns
          end

          if columns.size >= column_count
            start_column = columns.last.column.name
          else
            start_column = nil
          end
        rescue Exception => e
          if retryable_read_timeout?(e)
            logger.error "CassandraModel#stream_all_indexed_slices retrying get_range with fewer cols due to a %s: %s @ %s (cf='%s' row='%s' start='%s' count=%d)" %
                           [e.class.name, e.message, e.backtrace.first, column_family, row_key, start_column, column_count]
            column_count /= 10 if column_count > 1
            retry
          else
            raise
          end
        end
      end
    end

    if row_keys.size >= row_count
      start_row = row_keys.last
    else
      start_row = nil
    end
  end

  total_columns
end

.with_keyspace(keyspace, append_env = true, &block) ⇒ Object

Temporarily change the working keyspace for this class for the duration of the block. Resets working keyspace back to default once control has returned to the caller.

Parameters

keyspace(String)

Keyspace name

append_env(true|false)

optional; default true - whether to append the environment name

block(Proc)

Code that will be called in keyspace context



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/right_support/db/cassandra_model.rb', line 269

def with_keyspace(keyspace, append_env=true, &block)
  @@current_keyspace = keyspace
  env                = ENV['RACK_ENV'] || 'development'
  nspace             = namespace(env)
  if append_env
    if nspace
      tail = "_#{env}_#{nspace}"
    else
      tail = "_#{env}"
    end
    @@current_keyspace += tail unless @@current_keyspace.end_with?(tail)
  end
  block.call
ensure
  @@current_keyspace = nil
end

Instance Method Details

#[](key) ⇒ Object

Column value

Parameters

key(String|Integer)

Column name or key

Return

(Object|nil)

Column value, or nil if not found



804
805
806
807
808
809
810
# File 'lib/right_support/db/cassandra_model.rb', line 804

def [](key)
  ret = attributes[key]
  return ret if ret
  if key.kind_of? Integer
    return attributes[Cassandra::Long.new(key)]
  end
end

#[]=(key, value) ⇒ Object

Store new column value

Parameters

key(String|Integer)

Column name or key

value(Object)

Value to be stored

Return

(Object)

Value stored



820
821
822
# File 'lib/right_support/db/cassandra_model.rb', line 820

def []=(key, value)
  attributes[key] = value
end

#destroyObject

Delete object from Cassandra

Return

true

Always return true



828
829
830
# File 'lib/right_support/db/cassandra_model.rb', line 828

def destroy
  self.class.remove(key)
end

#reloadObject

Load object from Cassandra without modifying this object

Return

(CassandraModel)

Object as stored in Cassandra



784
785
786
# File 'lib/right_support/db/cassandra_model.rb', line 784

def reload
  self.class.get(key)
end

#reload!Object

Reload object value from Cassandra and update this object

Return

(CassandraModel)

This object after reload from Cassandra



792
793
794
795
# File 'lib/right_support/db/cassandra_model.rb', line 792

def reload!
  self.attributes = self.class.real_get(key)
  self
end

#saveObject

Store object in Cassandra

Return

true

Always return true



775
776
777
778
# File 'lib/right_support/db/cassandra_model.rb', line 775

def save
  self.class.insert(key, attributes)
  true
end