Class: MigrationBundler::Databases::CassandraDatabase

Inherits:
AbstractDatabase show all
Defined in:
lib/migration_bundler/databases/cassandra_database.rb

Instance Attribute Summary collapse

Attributes inherited from AbstractDatabase

#url

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from AbstractDatabase

#to_s

Constructor Details

#initialize(url) ⇒ CassandraDatabase

Returns a new instance of CassandraDatabase.



19
20
21
22
23
24
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 19

def initialize(url)
  super(url)
  options = { host: url.host, port: (url.port || 9042) }
  @client = Cql::Client.connect(options)
  @keyspace = url.path[1..-1] # Drop leading slash
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 7

def client
  @client
end

#keyspaceObject (readonly)

Returns the value of attribute keyspace.



7
8
9
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 7

def keyspace
  @keyspace
end

Class Method Details

.exception_classObject



14
15
16
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 14

def exception_class
  Cql::CqlError
end

.migration_extObject



10
11
12
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 10

def migration_ext
  ".cql"
end

Instance Method Details

#all_versionsObject



44
45
46
47
48
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 44

def all_versions
  client.use(keyspace)
  rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version ASC")
  rows.each.map { |row| row['version'] }
end

#create_migrations_tableObject



63
64
65
66
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 63

def create_migrations_table
  client.execute "CREATE KEYSPACE IF NOT EXISTS #{keyspace} WITH replication = {'class' : 'SimpleStrategy', 'replication_factor' : 1};"
  client.execute "CREATE TABLE IF NOT EXISTS #{keyspace}.schema_migrations (partition_key INT, version VARINT, PRIMARY KEY (partition_key, version));"
end

#current_versionObject



38
39
40
41
42
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 38

def current_version
  client.use(keyspace)
  rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version DESC LIMIT 1")
  rows.empty? ? nil : rows.each.first['version']
end

#drop(keyspaces = [keyspace]) ⇒ Object



59
60
61
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 59

def drop(keyspaces = [keyspace])
  keyspaces.each { |keyspace| client.execute "DROP KEYSPACE IF EXISTS #{keyspace}" }
end

#dump_rows(table_name) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 68

def dump_rows(table_name)
  client.use(keyspace)
  rows = client.execute "SELECT * FROM #{table_name}"
  columns = Array.new.tap do |columns|
    rows..each do ||
      columns << .column_name
    end
  end
  Array.new.tap do |statements|
    rows.each do |row|
      values = columns.map do |column|
        value = row[column]
        serialize_value_of_type(value, rows.[column].type)
      end
      statements << "INSERT INTO #{table_name} (#{columns.join(', ')}) VALUES (#{values.join(', ')});"
    end
  end
end

#execute_migration(cql) ⇒ Object



55
56
57
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 55

def execute_migration(cql)
  cql.split(';').each { |statement| client.execute(statement) unless statement.strip.empty? }
end

#insert_version(version) ⇒ Object



50
51
52
53
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 50

def insert_version(version)
  client.use(keyspace)
  client.execute "INSERT INTO schema_migrations (partition_key, version) VALUES (0, ?)", version
end

#migrations_table?Boolean

Returns:

  • (Boolean)


26
27
28
29
30
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 26

def migrations_table?
  client.use('system')
  rows = client.execute "SELECT columnfamily_name FROM schema_columnfamilies WHERE keyspace_name='#{keyspace}' AND columnfamily_name='schema_migrations'"
  !rows.empty?
end

#origin_versionObject



32
33
34
35
36
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 32

def origin_version
  client.use(keyspace)
  rows = client.execute("SELECT version FROM schema_migrations WHERE partition_key = 0 ORDER BY version ASC LIMIT 1")
  rows.empty? ? nil : rows.each.first['version']
end

#serialize_value_of_type(value, type) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/migration_bundler/databases/cassandra_database.rb', line 87

def serialize_value_of_type(value, type)
  if type.is_a?(Array) && type.first == :list
    '[' + serialize_value_of_type(value, type.last) + ']'
  elsif type.is_a?(Array) && type.first == :set
    '{' + serialize_value_of_type(value, type.last) + '}'
  elsif value.kind_of?(Enumerable)
    value.map { |e| serialize_value_of_type(e, type) }.join(', ')
  else
    if type == :blob
      '0x' + value.unpack('H*').first
    else
      case value
      when String then "'#{value}'"
      when Cql::TimeUuid then value.to_s
      when NilClass then 'NULL'
      else
        value
      end
    end
  end
end