Class: Octopus::Proxy
- Inherits:
-
Object
- Object
- Octopus::Proxy
- Defined in:
- lib/octopus/proxy.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
Returns the value of attribute config.
-
#sharded ⇒ Object
Returns the value of attribute sharded.
Instance Method Summary collapse
- #block ⇒ Object
- #block=(block) ⇒ Object
- #check_schema_migrations(shard) ⇒ Object
- #clean_connection_proxy ⇒ Object
- #clear_active_connections! ⇒ Object
- #clear_all_connections! ⇒ Object
- #clear_query_cache ⇒ Object
- #connected? ⇒ Boolean
- #connection_pool ⇒ Object
- #current_group ⇒ Object
- #current_group=(group_symbol) ⇒ Object
- #current_model ⇒ Object
- #current_model=(model) ⇒ Object
- #current_shard ⇒ Object
- #current_shard=(shard_symbol) ⇒ Object
- #current_slave_group ⇒ Object
- #current_slave_group=(slave_group_symbol) ⇒ Object
- #disable_query_cache! ⇒ Object
- #enable_query_cache! ⇒ Object
- #fully_replicated? ⇒ Boolean
-
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
-
#initialize(config = Octopus.config) ⇒ Proxy
constructor
A new instance of Proxy.
- #initialize_replication(config) ⇒ Object
- #initialize_shards(config) ⇒ Object
- #last_current_shard ⇒ Object
- #last_current_shard=(last_current_shard) ⇒ Object
- #method_missing(method, *args, &block) ⇒ Object
- #respond_to?(method, include_private = false) ⇒ Boolean
- #run_queries_on_shard(shard, &block) ⇒ Object
-
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool.
- #select_connection ⇒ Object
- #send_queries_to_multiple_shards(shards, &block) ⇒ Object
- #send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
- #send_queries_to_slave_group(method, *args, &block) ⇒ Object
- #shard_name ⇒ Object
-
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
-
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
- #should_clean_table_name? ⇒ Boolean
- #should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
- #should_send_queries_to_slave_group?(method) ⇒ Boolean
- #transaction(options = {}, &block) ⇒ Object
Constructor Details
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method, *args, &block) ⇒ Object
256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/octopus/proxy.rb', line 256 def method_missing(method, *args, &block) if should_clean_connection_proxy?(method) conn = select_connection() self.last_current_shard = self.current_shard clean_connection_proxy() conn.send(method, *args, &block) elsif should_send_queries_to_shard_slave_group?(method) send_queries_to_shard_slave_group(method, *args, &block) elsif should_send_queries_to_slave_group?(method) send_queries_to_slave_group(method, *args, &block) elsif should_send_queries_to_replicated_databases?(method) send_queries_to_selected_slave(method, *args, &block) else select_connection().send(method, *args, &block) end end |
Instance Attribute Details
#config ⇒ Object
Returns the value of attribute config.
6 7 8 |
# File 'lib/octopus/proxy.rb', line 6 def config @config end |
#sharded ⇒ Object
Returns the value of attribute sharded.
6 7 8 |
# File 'lib/octopus/proxy.rb', line 6 def sharded @sharded end |
Instance Method Details
#block ⇒ Object
155 156 157 |
# File 'lib/octopus/proxy.rb', line 155 def block Thread.current["octopus.block"] end |
#block=(block) ⇒ Object
159 160 161 |
# File 'lib/octopus/proxy.rb', line 159 def block=(block) Thread.current["octopus.block"] = block end |
#check_schema_migrations(shard) ⇒ Object
239 240 241 242 243 |
# File 'lib/octopus/proxy.rb', line 239 def check_schema_migrations(shard) if !OctopusModel.using(shard).connection.table_exists?(ActiveRecord::Migrator.schema_migrations_table_name()) OctopusModel.using(shard).connection.initialize_schema_migrations_table end end |
#clean_connection_proxy ⇒ Object
233 234 235 236 237 |
# File 'lib/octopus/proxy.rb', line 233 def clean_connection_proxy() self.current_shard = :master self.current_group = nil self.block = false end |
#clear_active_connections! ⇒ Object
294 295 296 |
# File 'lib/octopus/proxy.rb', line 294 def clear_active_connections! @shards.each { |k, v| v.release_connection } end |
#clear_all_connections! ⇒ Object
298 299 300 |
# File 'lib/octopus/proxy.rb', line 298 def clear_all_connections! @shards.each { |k, v| v.disconnect! } end |
#clear_query_cache ⇒ Object
290 291 292 |
# File 'lib/octopus/proxy.rb', line 290 def clear_query_cache @shards.each { |k, v| safe_connection(v).clear_query_cache } end |
#connected? ⇒ Boolean
302 303 304 |
# File 'lib/octopus/proxy.rb', line 302 def connected? @shards.any? { |k, v| v.connected? } end |
#connection_pool ⇒ Object
277 278 279 |
# File 'lib/octopus/proxy.rb', line 277 def connection_pool return @shards[current_shard] end |
#current_group ⇒ Object
134 135 136 |
# File 'lib/octopus/proxy.rb', line 134 def current_group Thread.current["octopus.current_group"] end |
#current_group=(group_symbol) ⇒ Object
138 139 140 141 142 143 144 145 |
# File 'lib/octopus/proxy.rb', line 138 def current_group=(group_symbol) # TODO: Error message should include all groups if given more than one bad name. [group_symbol].flatten.compact.each do |group| raise "Nonexistent Group Name: #{group}" unless has_group?(group) end Thread.current["octopus.current_group"] = group_symbol end |
#current_model ⇒ Object
91 92 93 |
# File 'lib/octopus/proxy.rb', line 91 def current_model Thread.current["octopus.current_model"] end |
#current_model=(model) ⇒ Object
95 96 97 |
# File 'lib/octopus/proxy.rb', line 95 def current_model=(model) Thread.current["octopus.current_model"] = model.is_a?(ActiveRecord::Base) ? model.class : model end |
#current_shard ⇒ Object
99 100 101 |
# File 'lib/octopus/proxy.rb', line 99 def current_shard Thread.current["octopus.current_shard"] ||= :master end |
#current_shard=(shard_symbol) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/octopus/proxy.rb', line 103 def current_shard=(shard_symbol) self.current_slave_group = nil if shard_symbol.is_a?(Array) shard_symbol.each {|symbol| raise "Nonexistent Shard Name: #{symbol}" if @shards[symbol].nil? } elsif shard_symbol.is_a?(Hash) hash = shard_symbol shard_symbol = hash[:shard] slave_group_symbol = hash[:slave_group] if shard_symbol.nil? && slave_group_symbol.nil? raise "Neither shard or slave group must be specified" end if shard_symbol.present? raise "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end if slave_group_symbol.present? if (@shards_slave_groups.try(:[], shard_symbol).present? && @shards_slave_groups[shard_symbol][slave_group_symbol].nil?) || (@shards_slave_groups.try(:[], shard_symbol).nil? && @slave_groups[slave_group_symbol].nil?) raise "Nonexistent Slave Group Name: #{slave_group_symbol} in shards config: #{@shards_config.inspect}" end self.current_slave_group = slave_group_symbol end else raise "Nonexistent Shard Name: #{shard_symbol}" if @shards[shard_symbol].nil? end Thread.current["octopus.current_shard"] = shard_symbol end |
#current_slave_group ⇒ Object
147 148 149 |
# File 'lib/octopus/proxy.rb', line 147 def current_slave_group Thread.current["octopus.current_slave_group"] end |
#current_slave_group=(slave_group_symbol) ⇒ Object
151 152 153 |
# File 'lib/octopus/proxy.rb', line 151 def current_slave_group=(slave_group_symbol) Thread.current["octopus.current_slave_group"] = slave_group_symbol end |
#disable_query_cache! ⇒ Object
286 287 288 |
# File 'lib/octopus/proxy.rb', line 286 def disable_query_cache! @shards.each { |k, v| safe_connection(v).disable_query_cache! } end |
#enable_query_cache! ⇒ Object
281 282 283 284 |
# File 'lib/octopus/proxy.rb', line 281 def enable_query_cache! clear_query_cache @shards.each { |k, v| safe_connection(v).enable_query_cache! } end |
#fully_replicated? ⇒ Boolean
171 172 173 |
# File 'lib/octopus/proxy.rb', line 171 def fully_replicated? @fully_replicated || Thread.current["octopus.fully_replicated"] end |
#has_group?(group) ⇒ Boolean
Public: Whether or not a group exists with the given name converted to a string.
Returns a boolean.
179 180 181 |
# File 'lib/octopus/proxy.rb', line 179 def has_group?(group) @groups.has_key?(group.to_s) end |
#initialize_replication(config) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/octopus/proxy.rb', line 78 def initialize_replication(config) @replicated = true if config.has_key?("fully_replicated") @fully_replicated = config["fully_replicated"] else @fully_replicated = true end @slaves_list = @shards.keys.map {|sym| sym.to_s}.sort @slaves_list.delete('master') @slaves_load_balancer = Octopus::LoadBalancing::RoundRobin.new(@slaves_list) end |
#initialize_shards(config) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/octopus/proxy.rb', line 13 def initialize_shards(config) @shards = HashWithIndifferentAccess.new @shards_slave_groups = HashWithIndifferentAccess.new @slave_groups = HashWithIndifferentAccess.new @groups = {} @adapters = Set.new @config = ActiveRecord::Base.connection_pool_without_octopus.connection.instance_variable_get(:@config) if !config.nil? @entire_sharded = config['entire_sharded'] @shards_config = config[Octopus.rails_env()] end @shards_config ||= [] @shards_config.each do |key, value| if value.is_a?(String) value = resolve_string_connection(value).merge(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") elsif value.is_a?(Hash) && value.has_key?("adapter") value.merge!(:octopus_shard => key) initialize_adapter(value['adapter']) @shards[key.to_sym] = connection_pool_for(value, "#{value['adapter']}_connection") slave_group_configs = value.select do |k,v| structurally_slave_group? v end if slave_group_configs.present? slave_groups = HashWithIndifferentAccess.new slave_group_configs.each do |slave_group_name, slave_configs| slaves = HashWithIndifferentAccess.new slave_configs.each do |slave_name, slave_config| @shards[slave_name.to_sym] = connection_pool_for(slave_config, "#{value['adapter']}_connection") slaves[slave_name.to_sym] = slave_name.to_sym end slave_groups[slave_group_name.to_sym] = Octopus::SlaveGroup.new(slaves) end @shards_slave_groups[key.to_sym] = slave_groups @sharded = true end elsif value.is_a?(Hash) @groups[key.to_s] = [] value.each do |k, v| raise "You have duplicated shard names!" if @shards.has_key?(k.to_sym) initialize_adapter(v['adapter']) config_with_octopus_shard = v.merge(:octopus_shard => k) @shards[k.to_sym] = connection_pool_for(config_with_octopus_shard, "#{v['adapter']}_connection") @groups[key.to_s] << k.to_sym end if structurally_slave_group? value slaves = Hash[@groups[key.to_s].map { |v| [v, v ] }] @slave_groups[key.to_sym] = Octopus::SlaveGroup.new(slaves) end end end @shards[:master] ||= ActiveRecord::Base.connection_pool_without_octopus() end |
#last_current_shard ⇒ Object
163 164 165 |
# File 'lib/octopus/proxy.rb', line 163 def last_current_shard Thread.current["octopus.last_current_shard"] end |
#last_current_shard=(last_current_shard) ⇒ Object
167 168 169 |
# File 'lib/octopus/proxy.rb', line 167 def last_current_shard=(last_current_shard) Thread.current["octopus.last_current_shard"] = last_current_shard end |
#respond_to?(method, include_private = false) ⇒ Boolean
273 274 275 |
# File 'lib/octopus/proxy.rb', line 273 def respond_to?(method, include_private = false) super || select_connection.respond_to?(method, include_private) end |
#run_queries_on_shard(shard, &block) ⇒ Object
219 220 221 222 223 224 225 |
# File 'lib/octopus/proxy.rb', line 219 def run_queries_on_shard(shard, &block) keeping_connection_proxy do using_shard(shard) do yield end end end |
#safe_connection(connection_pool) ⇒ Object
Rails 3.1 sets automatic_reconnect to false when it removes connection pool. Octopus can potentially retain a reference to a closed connection pool. Previously, that would work since the pool would just reconnect, but in Rails 3.1 the flag prevents this.
202 203 204 205 |
# File 'lib/octopus/proxy.rb', line 202 def safe_connection(connection_pool) connection_pool.automatic_reconnect ||= true connection_pool.connection() end |
#select_connection ⇒ Object
207 208 209 |
# File 'lib/octopus/proxy.rb', line 207 def select_connection safe_connection(@shards[shard_name]) end |
#send_queries_to_multiple_shards(shards, &block) ⇒ Object
227 228 229 230 231 |
# File 'lib/octopus/proxy.rb', line 227 def send_queries_to_multiple_shards(shards, &block) shards.each do |shard| self.run_queries_on_shard(shard, &block) end end |
#send_queries_to_shard_slave_group(method, *args, &block) ⇒ Object
310 311 312 |
# File 'lib/octopus/proxy.rb', line 310 def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end |
#send_queries_to_slave_group(method, *args, &block) ⇒ Object
318 319 320 |
# File 'lib/octopus/proxy.rb', line 318 def send_queries_to_slave_group(method, *args, &block) send_queries_to_balancer(@slave_groups[current_slave_group], method, *args, &block) end |
#shard_name ⇒ Object
211 212 213 |
# File 'lib/octopus/proxy.rb', line 211 def shard_name current_shard.is_a?(Array) ? current_shard.first : current_shard end |
#shard_names ⇒ Object
Public: Retrieves names of all loaded shards.
Returns an array of shard names as symbols
186 187 188 |
# File 'lib/octopus/proxy.rb', line 186 def shard_names @shards.keys end |
#shards_for_group(group) ⇒ Object
Public: Retrieves the defined shards for a given group.
Returns an array of shard names as symbols or nil if the group is not defined.
194 195 196 |
# File 'lib/octopus/proxy.rb', line 194 def shards_for_group(group) @groups.fetch(group.to_s, nil) end |
#should_clean_table_name? ⇒ Boolean
215 216 217 |
# File 'lib/octopus/proxy.rb', line 215 def should_clean_table_name? @adapters.size > 1 end |
#should_send_queries_to_shard_slave_group?(method) ⇒ Boolean
306 307 308 |
# File 'lib/octopus/proxy.rb', line 306 def should_send_queries_to_shard_slave_group?(method) should_use_slaves_for_method?(method) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end |
#should_send_queries_to_slave_group?(method) ⇒ Boolean
314 315 316 |
# File 'lib/octopus/proxy.rb', line 314 def should_send_queries_to_slave_group?(method) should_use_slaves_for_method?(method) && @slave_groups.try(:[], current_slave_group).present? end |
#transaction(options = {}, &block) ⇒ Object
245 246 247 248 249 250 251 252 253 254 |
# File 'lib/octopus/proxy.rb', line 245 def transaction( = {}, &block) replicated = @replicated && (current_model.replicated || fully_replicated?) if !sharded && replicated self.run_queries_on_shard(:master) do select_connection.transaction(, &block) end else select_connection.transaction(, &block) end end |