Class: DbCharmer::Sharding::Method::DbBlockMap
- Defined in:
- lib/db_charmer/sharding/method/db_block_map.rb
Defined Under Namespace
Classes: ShardInfo
Instance Attribute Summary collapse
-
#block_size ⇒ Object
Sharding keys block size.
-
#connection ⇒ Object
Mapping db connection.
-
#connection_name ⇒ Object
Mapping db connection.
-
#map_table ⇒ Object
Mapping table name.
-
#name ⇒ Object
Sharder name.
-
#shards_table ⇒ Object
Shards table name.
Instance Method Summary collapse
- #allocate_new_block_for_key(key) ⇒ Object
- #block_end_for_key(key) ⇒ Object
-
#block_for_key(key, cache = true) ⇒ Object
Returns a block for a key.
- #block_start_for_key(key) ⇒ Object
- #create_shard(params) ⇒ Object
- #get_cached_block(block_cache_key) ⇒ Object
-
#initialize(config) ⇒ DbBlockMap
constructor
A new instance of DbBlockMap.
- #least_loaded_shard ⇒ Object
-
#prepare_shard_model ⇒ Object
Prepare model for working with our shards table.
- #set_cached_block(block_cache_key, block) ⇒ Object
-
#shard_connection_config(shard) ⇒ Object
Create configuration (use mapping connection as a template).
- #shard_connections ⇒ Object
- #shard_for_key(key) ⇒ Object
-
#shard_info_by_id(shard_id, cache = true) ⇒ Object
Load shard info.
Constructor Details
#initialize(config) ⇒ DbBlockMap
Returns a new instance of DbBlockMap.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 24 def initialize(config) @name = config[:name] or raise(ArgumentError, "Missing required :name parameter!") @connection = DbCharmer::ConnectionFactory.connect(config[:connection], true) @block_size = (config[:block_size] || 10000).to_i @map_table = config[:map_table] or raise(ArgumentError, "Missing required :map_table parameter!") @shards_table = config[:shards_table] or raise(ArgumentError, "Missing required :shards_table parameter!") # Local caches @shard_info_cache = {} @blocks_cache = Rails.cache @blocks_cache_prefix = config[:blocks_cache_prefix] || "#{@name}_block:" end |
Instance Attribute Details
#block_size ⇒ Object
Sharding keys block size
22 23 24 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 22 def block_size @block_size end |
#connection ⇒ Object
Mapping db connection
13 14 15 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 13 def connection @connection end |
#connection_name ⇒ Object
Mapping db connection
13 14 15 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 13 def connection_name @connection_name end |
#map_table ⇒ Object
Mapping table name
16 17 18 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 16 def map_table @map_table end |
#name ⇒ Object
Sharder name
10 11 12 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 10 def name @name end |
#shards_table ⇒ Object
Shards table name
19 20 21 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 19 def shards_table @shards_table end |
Instance Method Details
#allocate_new_block_for_key(key) ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 111 def allocate_new_block_for_key(key) # Can't find any shards to use for blocks allocation! return nil unless shard = least_loaded_shard # Figure out block limits start_id = block_start_for_key(key) end_id = block_end_for_key(key) # Try to insert a new mapping (ignore duplicate key errors) sql = <<-SQL INSERT INTO #{map_table} (start_id, end_id, shard_id, block_size, created_at, updated_at) VALUES (#{start_id}, #{end_id}, #{shard.id}, #{block_size}, NOW(), NOW()) SQL connection.execute(sql, "Allocate new block") # Increment the blocks counter on the shard ShardInfo.update_counters(shard.id, :blocks_count => +1) # Retry block search after creation block_for_key(key) end |
#block_end_for_key(key) ⇒ Object
146 147 148 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 146 def block_end_for_key(key) block_size.to_i + block_start_for_key(key) end |
#block_for_key(key, cache = true) ⇒ Object
Returns a block for a key
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 70 def block_for_key(key, cache = true) # Cleanup the cache if asked to key_range = [ block_start_for_key(key), block_end_for_key(key) ] block_cache_key = "%d-%d" % key_range if cache cached_block = get_cached_block(block_cache_key) return cached_block if cached_block end # Fetch cached value or load from db block = begin sql = "SELECT * FROM #{map_table} WHERE start_id = #{key_range.first} AND end_id = #{key_range.last} LIMIT 1" connection.select_one(sql, 'Find a shard block') end set_cached_block(block_cache_key, block) return block end |
#block_start_for_key(key) ⇒ Object
142 143 144 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 142 def block_start_for_key(key) block_size.to_i * (key.to_i / block_size.to_i) end |
#create_shard(params) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 170 def create_shard(params) params = params.symbolize_keys [ :db_host, :db_port, :db_user, :db_pass, :db_name ].each do |arg| raise ArgumentError, "Missing required parameter: #{arg}" unless params[arg] end # Prepare model prepare_shard_model # Create the record ShardInfo.create! do |shard| shard.db_host = params[:db_host] shard.db_port = params[:db_port] shard.db_user = params[:db_user] shard.db_pass = params[:db_pass] shard.db_name = params[:db_name] end end |
#get_cached_block(block_cache_key) ⇒ Object
91 92 93 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 91 def get_cached_block(block_cache_key) @blocks_cache.read("#{@blocks_cache_prefix}#{block_cache_key}") end |
#least_loaded_shard ⇒ Object
133 134 135 136 137 138 139 140 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 133 def least_loaded_shard prepare_shard_model # Select shard shard = ShardInfo.all(:conditions => { :enabled => true, :open => true }, :order => 'blocks_count ASC', :limit => 1).first raise "Can't find any shards to use for blocks allocation!" unless shard return shard end |
#prepare_shard_model ⇒ Object
Prepare model for working with our shards table
198 199 200 201 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 198 def prepare_shard_model ShardInfo.table_name = shards_table ShardInfo.switch_connection_to(connection) end |
#set_cached_block(block_cache_key, block) ⇒ Object
95 96 97 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 95 def set_cached_block(block_cache_key, block) @blocks_cache.write("#{@blocks_cache_prefix}#{block_cache_key}", block) end |
#shard_connection_config(shard) ⇒ Object
Create configuration (use mapping connection as a template)
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 151 def shard_connection_config(shard) # Format connection name shard_name = "db_charmer_db_block_map_#{name}_shard_%05d" % shard.id # Here we get the mapping connection's configuration # They do not expose configs so we hack in and get the instance var # FIXME: Find a better way, maybe move config method to our ar extenstions connection.instance_variable_get(:@config).clone.merge( # Name for the connection factory :connection_name => shard_name, # Connection params :host => shard.db_host, :port => shard.db_port, :username => shard.db_user, :password => shard.db_pass, :database => shard.db_name ) end |
#shard_connections ⇒ Object
189 190 191 192 193 194 195 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 189 def shard_connections # Find all shards prepare_shard_model shards = ShardInfo.all(:conditions => { :enabled => true }) # Map them to connections shards.map { |shard| shard_connection_config(shard) } end |
#shard_for_key(key) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 39 def shard_for_key(key) block = block_for_key(key) begin # Auto-allocate new blocks block ||= allocate_new_block_for_key(key) rescue ::ActiveRecord::StatementInvalid => e raise unless e..include?('Duplicate entry') || e..include?('duplicate key') block = block_for_key(key) end raise ArgumentError, "Invalid key value, no shards found for this key and could not create a new block!" unless block # Bail if no shard found shard_id = block['shard_id'].to_i shard_info = shard_info_by_id(shard_id) raise ArgumentError, "Invalid shard_id: #{shard_id}" unless shard_info # Get config shard_connection_config(shard_info) end |
#shard_info_by_id(shard_id, cache = true) ⇒ Object
Load shard info
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/db_charmer/sharding/method/db_block_map.rb', line 100 def shard_info_by_id(shard_id, cache = true) # Cleanup the cache if asked to @shard_info_cache[shard_id] = nil unless cache # Either load from cache or from db @shard_info_cache[shard_id] ||= begin prepare_shard_model ShardInfo.find_by_id(shard_id) end end |