Module: DbCharmer::Sharding::Method::DbBlockGroupMapBase
- Included in:
- DbBlockGroupMap, DbBlockSchemaMap
- Defined in:
- lib/db_charmer/sharding/method/db_block_group_map_base.rb
Instance Attribute Summary collapse
-
#block_size ⇒ Object
Sharding keys block size.
-
#connection ⇒ Object
Mapping db connection.
-
#connection_name ⇒ Object
Mapping db connection.
-
#groups_table ⇒ Object
Tablegroups table name.
-
#map_table ⇒ Object
Mapping table name.
-
#name ⇒ Object
————————————————————————————————————— Sharder name.
-
#shards_table ⇒ Object
Shards table name.
Instance Method Summary collapse
-
#allocate_new_block_for_key(key) ⇒ Object
—————————————————————————————————————.
- #block_end_for_key(key) ⇒ Object
-
#block_for_key(key, cache = true) ⇒ Object
————————————————————————————————————— Returns a block for a key.
-
#block_start_for_key(key) ⇒ Object
—————————————————————————————————————.
- #clear_group_info_cache ⇒ Object
- #clear_shard_info_cache ⇒ Object
- #create_shard_database(shard) ⇒ Object
- #drop_all_shard_databases ⇒ Object
- #drop_shard_database(shard) ⇒ Object
-
#get_cached_block(block_cache_key) ⇒ Object
—————————————————————————————————————.
- #group_class ⇒ Object
-
#group_info_by_id(group_id, cache = true) ⇒ Object
————————————————————————————————————— Load group info.
- #initialize(config) ⇒ Object
-
#kill_connections(config, verbose = false) ⇒ Object
kills all connections to a database.
- #least_loaded_group ⇒ Object
-
#prepare_shard_models ⇒ Object
Prepare model for working with our shards table.
- #set_cached_block(block_cache_key, block) ⇒ Object
-
#set_psql_env(config) ⇒ Object
sets environment variables to be used by an exec’d psql process.
- #shard_class ⇒ Object
-
#shard_connection_config_no_dbname(shard) ⇒ Object
This connections settings can be used to drop and create databases.
- #shard_connections ⇒ Object
-
#shard_for_key(key) ⇒ Object
—————————————————————————————————————.
-
#shard_info_by_group_id(group_id) ⇒ Object
Load shard info using mapping info for a group.
-
#shard_info_by_id(shard_id, cache = true) ⇒ Object
Load shard info.
Instance Attribute Details
#block_size ⇒ Object
Sharding keys block size
30 31 32 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 30 def block_size @block_size end |
#connection ⇒ Object
Mapping db connection
18 19 20 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 18 def connection @connection end |
#connection_name ⇒ Object
Mapping db connection
18 19 20 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 18 def connection_name @connection_name end |
#groups_table ⇒ Object
Tablegroups table name
24 25 26 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 24 def groups_table @groups_table end |
#map_table ⇒ Object
Mapping table name
21 22 23 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 21 def map_table @map_table end |
#name ⇒ Object
Sharder name
15 16 17 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 15 def name @name end |
#shards_table ⇒ Object
Shards table name
27 28 29 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 27 def shards_table @shards_table end |
Instance Method Details
#allocate_new_block_for_key(key) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 148 def allocate_new_block_for_key(key) # Can't find any groups to use for blocks allocation! return nil unless group = least_loaded_group # Figure out block limits start_id = block_start_for_key(key) end_id = block_end_for_key(key) # Try to insert a new mapping (ignore duplicate key errors) sql = <<-SQL INSERT INTO #{map_table} (start_id, end_id, group_id, block_size, created_at, updated_at) VALUES (#{start_id}, #{end_id}, #{group.id}, #{block_size}, NOW(), NOW()) SQL connection.execute(sql, "Allocate new block") # Increment the blocks counter on the shard group_class.update_counters(group.id, :blocks_count => +1) # Retry block search after creation block_for_key(key) end |
#block_end_for_key(key) ⇒ Object
185 186 187 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 185 def block_end_for_key(key) block_size.to_i + block_start_for_key(key) end |
#block_for_key(key, cache = true) ⇒ Object
Returns a block for a key
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 72 def block_for_key(key, cache = true) # Cleanup the cache if asked to key_range = [ block_start_for_key(key), block_end_for_key(key) ] block_cache_key = "%d-%d" % key_range if cache cached_block = get_cached_block(block_cache_key) return cached_block if cached_block end # Fetch cached value or load from db block = begin sql = "SELECT * FROM #{map_table} WHERE start_id = #{key_range.first} AND end_id = #{key_range.last} LIMIT 1" connection.select_one(sql, 'Find a shard block') end set_cached_block(block_cache_key, block) return block end |
#block_start_for_key(key) ⇒ Object
181 182 183 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 181 def block_start_for_key(key) block_size.to_i * (key.to_i / block_size.to_i) end |
#clear_group_info_cache ⇒ Object
131 132 133 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 131 def clear_group_info_cache @group_info_cache = {} end |
#clear_shard_info_cache ⇒ Object
127 128 129 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 127 def clear_shard_info_cache @shard_info_cache = {} end |
#create_shard_database(shard) ⇒ Object
239 240 241 242 243 244 245 246 247 248 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 239 def create_shard_database(shard) conn_config = shard_connection_config_no_dbname(shard) unless ::ActiveRecord::Base.exist_pg_database?(conn_config, shard.db_name) old_proxy = ::ActiveRecord::Base.db_charmer_connection_proxy ::ActiveRecord::Base.establish_connection(conn_config) ::ActiveRecord::Base.connection.create_database(shard.db_name, conn_config) ::ActiveRecord::Base.connection.disconnect! ::ActiveRecord::Base.switch_connection_to(old_proxy) end end |
#drop_all_shard_databases ⇒ Object
281 282 283 284 285 286 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 281 def drop_all_shard_databases prepare_shard_models shard_class.all.each do |shard| drop_shard_database(shard) end end |
#drop_shard_database(shard) ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 250 def drop_shard_database(shard) conn_config = shard_connection_config_no_dbname(shard) conn_config_with_dbname = conn_config.clone.merge(:database => shard.db_name) kill_connections(conn_config_with_dbname, true) old_proxy = ::ActiveRecord::Base.db_charmer_connection_proxy ::ActiveRecord::Base.establish_connection(conn_config) puts "Dropping shard database: #{shard.inspect}" ::ActiveRecord::Base.connection.drop_database(shard.db_name) ::ActiveRecord::Base.connection.disconnect! ::ActiveRecord::Base.switch_connection_to(old_proxy) end |
#get_cached_block(block_cache_key) ⇒ Object
94 95 96 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 94 def get_cached_block(block_cache_key) @blocks_cache.read("#{@blocks_cache_prefix}#{block_cache_key}") end |
#group_class ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 189 def group_class if self.is_a?(DbCharmer::Sharding::Method::DbBlockGroupMap) "DbCharmer::Sharding::Method::DbBlockGroupMap::Group".classify.constantize elsif self.is_a?(DbCharmer::Sharding::Method::DbBlockSchemaMap) "DbCharmer::Sharding::Method::DbBlockSchemaMap::Group".classify.constantize end end |
#group_info_by_id(group_id, cache = true) ⇒ Object
Load group info
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 104 def group_info_by_id(group_id, cache = true) # Cleanup the cache if asked to @group_info_cache[group_id] = nil unless cache # Either load from cache or from db @group_info_cache[group_id] ||= begin prepare_shard_models group_class.find_by_id(group_id) end end |
#initialize(config) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 32 def initialize(config) @name = config[:name] or raise(ArgumentError, "Missing required :name parameter!") @connection = DbCharmer::ConnectionFactory.connect(config[:connection], true) if self.is_a?(DbCharmer::Sharding::Method::DbBlockSchemaMap) && ::ActiveRecord::Base.configurations[DbCharmer.env]['adapter'] != 'postgresql' raise(ArgumentError, 'DbBlockSchemaMap method can only be used with the postgresql adapter') end @block_size = (config[:block_size] || 10000).to_i @map_table = config[:map_table] or raise(ArgumentError, "Missing required :map_table parameter!") @groups_table = config[:groups_table] or raise(ArgumentError, "Missing required :groups_table parameter!") @shards_table = config[:shards_table] or raise(ArgumentError, "Missing required :shards_table parameter!") # Local caches @shard_info_cache = {} @group_info_cache = {} @blocks_cache = Rails.cache @blocks_cache_prefix = config[:blocks_cache_prefix] || "#{@name}_block:" end |
#kill_connections(config, verbose = false) ⇒ Object
kills all connections to a database
271 272 273 274 275 276 277 278 279 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 271 def kill_connections(config, verbose = false) set_psql_env(config) proclist_sql = "select pg_terminate_backend(procpid) from pg_stat_activity where datname='#{config[:database]}';" cmd = "psql -U '#{config[:username]}' -c \"#{proclist_sql}\" -t" puts "Killing connections to: #{config[:database]}" puts cmd if verbose system(cmd) end |
#least_loaded_group ⇒ Object
171 172 173 174 175 176 177 178 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 171 def least_loaded_group prepare_shard_models # Select group group = group_class.first(:conditions => { :enabled => true, :open => true }, :order => 'blocks_count ASC') raise "Can't find any tablegroups to use for blocks allocation!" unless group return group end |
#prepare_shard_models ⇒ Object
Prepare model for working with our shards table
214 215 216 217 218 219 220 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 214 def prepare_shard_models shard_class.switch_connection_to(connection) shard_class.set_table_name(shards_table) group_class.switch_connection_to(connection) group_class.set_table_name(groups_table) end |
#set_cached_block(block_cache_key, block) ⇒ Object
98 99 100 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 98 def set_cached_block(block_cache_key, block) @blocks_cache.write("#{@blocks_cache_prefix}#{block_cache_key}", block) end |
#set_psql_env(config) ⇒ Object
sets environment variables to be used by an exec’d psql process
263 264 265 266 267 268 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 263 def set_psql_env(config) ENV['PGHOST'] = config[:host] if config[:host] ENV['PGPORT'] = config[:port].to_s if config[:port] ENV['PGPASSWORD'] = config[:password].to_s if config[:password] ENV['PGUSER'] = config[:username].to_s if config[:username] end |
#shard_class ⇒ Object
197 198 199 200 201 202 203 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 197 def shard_class if self.is_a?(DbCharmer::Sharding::Method::DbBlockGroupMap) "DbCharmer::Sharding::Method::DbBlockGroupMap::Shard".classify.constantize elsif self.is_a?(DbCharmer::Sharding::Method::DbBlockSchemaMap) "DbCharmer::Sharding::Method::DbBlockSchemaMap::Shard".classify.constantize end end |
#shard_connection_config_no_dbname(shard) ⇒ Object
This connections settings can be used to drop and create databases
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 223 def shard_connection_config_no_dbname(shard) # Format connection name connection_name = "db_charmer_db_block_group_map_#{name}_s%d_no_db" % shard.id connection.instance_variable_get(:@config).clone.merge( # Name for the connection factory :connection_name => connection_name, # Connection params :host => shard.db_host, :port => shard.db_port, :username => shard.db_user, :password => shard.db_pass, :database => nil, :schema_name => '' ) end |
#shard_connections ⇒ Object
205 206 207 208 209 210 211 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 205 def shard_connections # Find all groups prepare_shard_models groups = group_class.all(:conditions => { :enabled => true }, :include => :shard) # Map them to shards groups.map { |group| shard_connection_config(group.shard, group.id) } end |
#shard_for_key(key) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 55 def shard_for_key(key) block = block_for_key(key) # Auto-allocate new blocks block ||= allocate_new_block_for_key(key) raise ArgumentError, "Invalid key value, no shards found for this key and could not create a new block!" unless block # Load shard group_id = block['group_id'].to_i shard_info = shard_info_by_group_id(group_id) # Get config shard_connection_config(shard_info, group_id) end |
#shard_info_by_group_id(group_id) ⇒ Object
Load shard info using mapping info for a group
136 137 138 139 140 141 142 143 144 145 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 136 def shard_info_by_group_id(group_id) # Load group group_info = group_info_by_id(group_id) raise ArgumentError, "Invalid group_id: #{group_id}" unless group_info shard_info = shard_info_by_id(group_info.shard_id) raise ArgumentError, "Invalid shard_id: #{group_info.shard_id}" unless shard_info return shard_info end |
#shard_info_by_id(shard_id, cache = true) ⇒ Object
Load shard info
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/db_charmer/sharding/method/db_block_group_map_base.rb', line 116 def shard_info_by_id(shard_id, cache = true) # Cleanup the cache if asked to @shard_info_cache[shard_id] = nil unless cache # Either load from cache or from db @shard_info_cache[shard_id] ||= begin prepare_shard_models shard_class.find_by_id(shard_id) end end |