Module: DbCharmer::Sharding::Method::DbBlockGroupMapBase

Included in:
DbBlockGroupMap, DbBlockSchemaMap
Defined in:
lib/db_charmer/sharding/method/db_block_group_map_base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#block_sizeObject

Sharding keys block size



30
31
32
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 30

def block_size
  @block_size
end

#connectionObject

Mapping db connection



18
19
20
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 18

def connection
  @connection
end

#connection_nameObject

Mapping db connection



18
19
20
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 18

def connection_name
  @connection_name
end

#groups_tableObject

Tablegroups table name



24
25
26
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 24

def groups_table
  @groups_table
end

#map_tableObject

Mapping table name



21
22
23
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 21

def map_table
  @map_table
end

#nameObject


Sharder name



15
16
17
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 15

def name
  @name
end

#shards_tableObject

Shards table name



27
28
29
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 27

def shards_table
  @shards_table
end

Instance Method Details

#allocate_new_block_for_key(key) ⇒ Object




148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 148

def allocate_new_block_for_key(key)
  # Can't find any groups to use for blocks allocation!
  return nil unless group = least_loaded_group

  # Figure out block limits
  start_id = block_start_for_key(key)
  end_id = block_end_for_key(key)

  # Try to insert a new mapping (ignore duplicate key errors)
  sql = <<-SQL
      INSERT INTO #{map_table}
                   (start_id, end_id, group_id, block_size, created_at, updated_at) VALUES
                   (#{start_id}, #{end_id}, #{group.id}, #{block_size}, NOW(), NOW())
  SQL
  connection.execute(sql, "Allocate new block")

  # Increment the blocks counter on the shard
  group_class.update_counters(group.id, :blocks_count => +1)

  # Retry block search after creation
  block_for_key(key)
end

#block_end_for_key(key) ⇒ Object



185
186
187
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 185

def block_end_for_key(key)
  block_size.to_i + block_start_for_key(key)
end

#block_for_key(key, cache = true) ⇒ Object


Returns a block for a key



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 72

def block_for_key(key, cache = true)
  # Cleanup the cache if asked to
  key_range = [ block_start_for_key(key), block_end_for_key(key) ]
  block_cache_key = "%d-%d" % key_range

  if cache
    cached_block = get_cached_block(block_cache_key)
    return cached_block if cached_block
  end

  # Fetch cached value or load from db
  block = begin
    sql = "SELECT * FROM #{map_table} WHERE start_id = #{key_range.first} AND end_id = #{key_range.last} LIMIT 1"
    connection.select_one(sql, 'Find a shard block')
  end

  set_cached_block(block_cache_key, block)

  return block
end

#block_start_for_key(key) ⇒ Object




181
182
183
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 181

def block_start_for_key(key)
  block_size.to_i * (key.to_i / block_size.to_i)
end

#clear_group_info_cacheObject



131
132
133
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 131

def clear_group_info_cache
  @group_info_cache = {}
end

#clear_shard_info_cacheObject



127
128
129
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 127

def clear_shard_info_cache
  @shard_info_cache = {}
end

#create_shard_database(shard) ⇒ Object



239
240
241
242
243
244
245
246
247
248
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 239

def create_shard_database(shard)
  conn_config = shard_connection_config_no_dbname(shard)
  unless ::ActiveRecord::Base.exist_pg_database?(conn_config, shard.db_name)
    old_proxy = ::ActiveRecord::Base.db_charmer_connection_proxy
    ::ActiveRecord::Base.establish_connection(conn_config)
    ::ActiveRecord::Base.connection.create_database(shard.db_name, conn_config)
    ::ActiveRecord::Base.connection.disconnect!
    ::ActiveRecord::Base.switch_connection_to(old_proxy)
  end
end

#drop_all_shard_databasesObject



281
282
283
284
285
286
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 281

def drop_all_shard_databases
  prepare_shard_models
  shard_class.all.each do |shard|
    drop_shard_database(shard)
  end
end

#drop_shard_database(shard) ⇒ Object



250
251
252
253
254
255
256
257
258
259
260
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 250

def drop_shard_database(shard)
  conn_config = shard_connection_config_no_dbname(shard)
  conn_config_with_dbname = conn_config.clone.merge(:database => shard.db_name)
  kill_connections(conn_config_with_dbname, true)
  old_proxy = ::ActiveRecord::Base.db_charmer_connection_proxy
  ::ActiveRecord::Base.establish_connection(conn_config)
  puts "Dropping shard database: #{shard.inspect}"
  ::ActiveRecord::Base.connection.drop_database(shard.db_name)
  ::ActiveRecord::Base.connection.disconnect!
  ::ActiveRecord::Base.switch_connection_to(old_proxy)
end

#get_cached_block(block_cache_key) ⇒ Object




94
95
96
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 94

def get_cached_block(block_cache_key)
  @blocks_cache.read("#{@blocks_cache_prefix}#{block_cache_key}")
end

#group_classObject



189
190
191
192
193
194
195
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 189

def group_class
  if self.is_a?(DbCharmer::Sharding::Method::DbBlockGroupMap)
    "DbCharmer::Sharding::Method::DbBlockGroupMap::Group".classify.constantize
  elsif self.is_a?(DbCharmer::Sharding::Method::DbBlockSchemaMap)
    "DbCharmer::Sharding::Method::DbBlockSchemaMap::Group".classify.constantize
  end
end

#group_info_by_id(group_id, cache = true) ⇒ Object


Load group info



104
105
106
107
108
109
110
111
112
113
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 104

def group_info_by_id(group_id, cache = true)
  # Cleanup the cache if asked to
  @group_info_cache[group_id] = nil unless cache

  # Either load from cache or from db
  @group_info_cache[group_id] ||= begin
    prepare_shard_models
    group_class.find_by_id(group_id)
  end
end

#initialize(config) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 32

def initialize(config)
  @name = config[:name] or raise(ArgumentError, "Missing required :name parameter!")

  @connection = DbCharmer::ConnectionFactory.connect(config[:connection], true)
  if self.is_a?(DbCharmer::Sharding::Method::DbBlockSchemaMap) && ::ActiveRecord::Base.configurations[DbCharmer.env]['adapter'] != 'postgresql'
    raise(ArgumentError, 'DbBlockSchemaMap method can only be used with the postgresql adapter')
  end

  @block_size = (config[:block_size] || 10000).to_i

  @map_table = config[:map_table] or raise(ArgumentError, "Missing required :map_table parameter!")
  @groups_table = config[:groups_table] or raise(ArgumentError, "Missing required :groups_table parameter!")
  @shards_table = config[:shards_table] or raise(ArgumentError, "Missing required :shards_table parameter!")

  # Local caches
  @shard_info_cache = {}
  @group_info_cache = {}

  @blocks_cache = Rails.cache
  @blocks_cache_prefix = config[:blocks_cache_prefix] || "#{@name}_block:"
end

#kill_connections(config, verbose = false) ⇒ Object

kills all connections to a database



271
272
273
274
275
276
277
278
279
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 271

def kill_connections(config, verbose = false)
  set_psql_env(config)
  proclist_sql = "select pg_terminate_backend(procpid) from pg_stat_activity where datname='#{config[:database]}';"
  cmd = "psql -U '#{config[:username]}' -c \"#{proclist_sql}\" -t"

  puts "Killing connections to: #{config[:database]}"
  puts cmd if verbose
  system(cmd)
end

#least_loaded_groupObject



171
172
173
174
175
176
177
178
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 171

def least_loaded_group
  prepare_shard_models

  # Select group
  group = group_class.first(:conditions => { :enabled => true, :open => true }, :order => 'blocks_count ASC')
  raise "Can't find any tablegroups to use for blocks allocation!" unless group
  return group
end

#prepare_shard_modelsObject

Prepare model for working with our shards table



214
215
216
217
218
219
220
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 214

def prepare_shard_models
  shard_class.switch_connection_to(connection)
  shard_class.set_table_name(shards_table)

  group_class.switch_connection_to(connection)
  group_class.set_table_name(groups_table)
end

#set_cached_block(block_cache_key, block) ⇒ Object



98
99
100
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 98

def set_cached_block(block_cache_key, block)
  @blocks_cache.write("#{@blocks_cache_prefix}#{block_cache_key}", block)
end

#set_psql_env(config) ⇒ Object

sets environment variables to be used by an exec’d psql process



263
264
265
266
267
268
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 263

def set_psql_env(config)
  ENV['PGHOST']     = config[:host]          if config[:host]
  ENV['PGPORT']     = config[:port].to_s     if config[:port]
  ENV['PGPASSWORD'] = config[:password].to_s if config[:password]
  ENV['PGUSER']     = config[:username].to_s if config[:username]
end

#shard_classObject



197
198
199
200
201
202
203
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 197

def shard_class
  if self.is_a?(DbCharmer::Sharding::Method::DbBlockGroupMap)
    "DbCharmer::Sharding::Method::DbBlockGroupMap::Shard".classify.constantize
  elsif self.is_a?(DbCharmer::Sharding::Method::DbBlockSchemaMap)
    "DbCharmer::Sharding::Method::DbBlockSchemaMap::Shard".classify.constantize
  end
end

#shard_connection_config_no_dbname(shard) ⇒ Object

This connections settings can be used to drop and create databases



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 223

def shard_connection_config_no_dbname(shard)
  # Format connection name
  connection_name = "db_charmer_db_block_group_map_#{name}_s%d_no_db" % shard.id
  connection.instance_variable_get(:@config).clone.merge(
    # Name for the connection factory
    :connection_name => connection_name,
    # Connection params
    :host => shard.db_host,
    :port => shard.db_port,
    :username => shard.db_user,
    :password => shard.db_pass,
    :database => nil,
    :schema_name => ''
  )
end

#shard_connectionsObject



205
206
207
208
209
210
211
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 205

def shard_connections
  # Find all groups
  prepare_shard_models
  groups = group_class.all(:conditions => { :enabled => true }, :include => :shard)
  # Map them to shards
  groups.map { |group| shard_connection_config(group.shard, group.id) }
end

#shard_for_key(key) ⇒ Object


Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 55

def shard_for_key(key)
  block = block_for_key(key)

  # Auto-allocate new blocks
  block ||= allocate_new_block_for_key(key)
  raise ArgumentError, "Invalid key value, no shards found for this key and could not create a new block!" unless block

  # Load shard
  group_id = block['group_id'].to_i
  shard_info = shard_info_by_group_id(group_id)

  # Get config
  shard_connection_config(shard_info, group_id)
end

#shard_info_by_group_id(group_id) ⇒ Object

Load shard info using mapping info for a group

Raises:

  • (ArgumentError)


136
137
138
139
140
141
142
143
144
145
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 136

def shard_info_by_group_id(group_id)
  # Load group
  group_info = group_info_by_id(group_id)
  raise ArgumentError, "Invalid group_id: #{group_id}" unless group_info

  shard_info = shard_info_by_id(group_info.shard_id)
  raise ArgumentError, "Invalid shard_id: #{group_info.shard_id}" unless shard_info

  return shard_info
end

#shard_info_by_id(shard_id, cache = true) ⇒ Object

Load shard info



116
117
118
119
120
121
122
123
124
125
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 116

def shard_info_by_id(shard_id, cache = true)
  # Cleanup the cache if asked to
  @shard_info_cache[shard_id] = nil unless cache

  # Either load from cache or from db
  @shard_info_cache[shard_id] ||= begin
    prepare_shard_models
    shard_class.find_by_id(shard_id)
  end
end